Hi Becket, Introducing CacheHandle seems too complicated. That means users have to maintain Handler properly.
And since cache is just a hint for optimizer, why not just return Table itself for cache method. This hint info should be kept in Table I believe. So how about adding method cache and uncache for Table, and both return Table. Because what cache and uncache did is just adding some hint info into Table. Becket Qin <becket....@gmail.com> 于2018年12月12日周三 上午11:25写道: > Hi Till and Piotrek, > > Thanks for the clarification. That solves quite a few confusion. My > understanding of how cache works is same as what Till describe. i.e. > cache() is a hint to Flink, but it is not guaranteed that cache always > exist and it might be recomputed from its lineage. > > Is this the core of our disagreement here? That you would like this > > “cache()” to be mostly hint for the optimiser? > > Semantic wise, yes. That's also why I think materialize() has a much larger > scope than cache(), thus it should be a different method. > > Regarding the chance of optimization, it might not be that rare. Some very > simple statistics could already help in many cases. For example, simply > maintaining max and min of each fields can already eliminate some > unnecessary table scan (potentially scanning the cached table) if the > result is doomed to be empty. A histogram would give even further > information. The optimizer could be very careful and only ignores cache > when it is 100% sure doing that is cheaper. e.g. only when a filter on the > cache will absolutely return nothing. > > Given the above clarification on cache, I would like to revisit the > original "void cache()" proposal and see if we can improve on top of that. > > What do you think about the following modified interface? > > Table { > /** > * This call hints Flink to maintain a cache of this table and leverage > it for performance optimization if needed. > * Note that Flink may still decide to not use the cache if it is cheaper > by doing so. > * > * A CacheHandle will be returned to allow user release the cache > actively. The cache will be deleted if there > * is no unreleased cache handlers to it. When the TableEnvironment is > closed. The cache will also be deleted > * and all the cache handlers will be released. > * > * @return a CacheHandle referring to the cache of this table. > */ > CacheHandle cache(); > } > > CacheHandle { > /** > * Close the cache handle. This method does not necessarily deletes the > cache. Instead, it simply decrements the reference counter to the cache. > * When the there is no handle referring to a cache. The cache will be > deleted. > * > * @return the number of open handles to the cache after this handle has > been released. > */ > int release() > } > > The rationale behind this interface is following: > In vast majority of the cases, users wouldn't really care whether the cache > is used or not. So I think the most intuitive way is letting cache() return > nothing. So nobody needs to worry about the difference between operations > on CacheTables and those on the "original" tables. This will make maybe > 99.9% of the users happy. There were two concerns raised for this approach: > 1. In some rare cases, users may want to ignore cache, > 2. A table might be cached/uncached in a third party function while the > caller does not know. > > For the first issue, users can use hint("ignoreCache") to explicitly ignore > cache. > For the second issue, the above proposal lets cache() return a CacheHandle, > the only method in it is release(). Different CacheHandles will refer to > the same cache, if a cache no longer has any cache handle, it will be > deleted. This will address the following case: > { > val handle1 = a.cache() > process(a) > a.select(...) // cache is still available, handle1 has not been released. > } > > void process(Table t) { > val handle2 = t.cache() // new handle to cache > t.select(...) // optimizer decides cache usage > t.hint("ignoreCache").select(...) // cache is ignored > handle2.release() // release the handle, but the cache may still be > available if there are other handles > ... > } > > Does the above modified approach look reasonable to you? > > Cheers, > > Jiangjie (Becket) Qin > > > > > > > > On Tue, Dec 11, 2018 at 6:44 PM Till Rohrmann <trohrm...@apache.org> > wrote: > > > Hi Becket, > > > > I was aiming at semantics similar to 1. I actually thought that `cache()` > > would tell the system to materialize the intermediate result so that > > subsequent queries don't need to reprocess it. This means that the usage > of > > the cached table in this example > > > > { > > val cachedTable = a.cache() > > val b1 = cachedTable.select(…) > > val b2 = cachedTable.foo().select(…) > > val b3 = cachedTable.bar().select(...) > > val c1 = a.select(…) > > val c2 = a.foo().select(…) > > val c3 = a.bar().select(...) > > } > > > > strongly depends on interleaved calls which trigger the execution of sub > > queries. So for example, if there is only a single env.execute call at > the > > end of block, then b1, b2, b3, c1, c2 and c3 would all be computed by > > reading directly from the sources (given that there is only a single > > JobGraph). It just happens that the result of `a` will be cached such > that > > we skip the processing of `a` when there are subsequent queries reading > > from `cachedTable`. If for some reason the system cannot materialize the > > table (e.g. running out of disk space, ttl expired), then it could also > > happen that we need to reprocess `a`. In that sense `cachedTable` simply > is > > an identifier for the materialized result of `a` with the lineage how to > > reprocess it. > > > > Cheers, > > Till > > > > > > > > > > > > On Tue, Dec 11, 2018 at 11:01 AM Piotr Nowojski <pi...@data-artisans.com > > > > wrote: > > > > > Hi Becket, > > > > > > > { > > > > val cachedTable = a.cache() > > > > val b = cachedTable.select(...) > > > > val c = a.select(...) > > > > } > > > > > > > > Semantic 1. b uses cachedTable as user demanded so. c uses original > DAG > > > as > > > > user demanded so. In this case, the optimizer has no chance to > > optimize. > > > > Semantic 2. b uses cachedTable as user demanded so. c leaves the > > > optimizer > > > > to choose whether the cache or DAG should be used. In this case, user > > > lose > > > > the option to NOT use cache. > > > > > > > > As you can see, neither of the options seem perfect. However, I guess > > you > > > > and Till are proposing the third option: > > > > > > > > Semantic 3. b leaves the optimizer to choose whether cache or DAG > > should > > > be > > > > used. c always use the DAG. > > > > > > I am pretty sure that me, Till, Fabian and others were all proposing > and > > > advocating in favour of semantic “1”. No cost based optimiser decisions > > at > > > all. > > > > > > { > > > val cachedTable = a.cache() > > > val b1 = cachedTable.select(…) > > > val b2 = cachedTable.foo().select(…) > > > val b3 = cachedTable.bar().select(...) > > > val c1 = a.select(…) > > > val c2 = a.foo().select(…) > > > val c3 = a.bar().select(...) > > > } > > > > > > All b1, b2 and b3 are reading from cache, while c1, c2 and c3 are > > > re-executing whole plan for “a”. > > > > > > In the future we could discuss going one step further, introducing some > > > global optimisation (that can be manually enabled/disabled): > deduplicate > > > plan nodes/deduplicate sub queries/re-use sub queries results/or > whatever > > > we could call it. It could do two things: > > > > > > 1. Automatically try to deduplicate fragments of the plan and share the > > > result using CachedTable - in other words automatically insert > > `CachedTable > > > cache()` calls. > > > 2. Automatically make decision to bypass explicit `CachedTable` access > > > (this would be the equivalent of what you described as “semantic 3”). > > > > > > However as I wrote previously, I have big doubts if such cost-based > > > optimisation would work (this applies also to “Semantic 2”). I would > > expect > > > it to do more harm than good in so many cases, that it wouldn’t make > > sense. > > > Even assuming that we calculate statistics perfectly (this ain’t gonna > > > happen), it’s virtually impossible to correctly estimate correct > exchange > > > rate of CPU cycles vs IO operations as it is changing so much from > > > deployment to deployment. > > > > > > Is this the core of our disagreement here? That you would like this > > > “cache()” to be mostly hint for the optimiser? > > > > > > Piotrek > > > > > > > On 11 Dec 2018, at 06:00, Becket Qin <becket....@gmail.com> wrote: > > > > > > > > Another potential concern for semantic 3 is that. In the future, we > may > > > add > > > > automatic caching to Flink. e.g. cache the intermediate results at > the > > > > shuffle boundary. If our semantic is that reference to the original > > table > > > > means skipping cache, those users may not be able to benefit from the > > > > implicit cache. > > > > > > > > > > > > > > > > On Tue, Dec 11, 2018 at 12:10 PM Becket Qin <becket....@gmail.com> > > > wrote: > > > > > > > >> Hi Piotrek, > > > >> > > > >> Thanks for the reply. Thought about it again, I might have > > misunderstood > > > >> your proposal in earlier emails. Returning a CachedTable might not > be > > a > > > bad > > > >> idea. > > > >> > > > >> I was more concerned about the semantic and its intuitiveness when a > > > >> CachedTable is returned. i..e, if cache() returns CachedTable. What > > are > > > the > > > >> semantic in the following code: > > > >> { > > > >> val cachedTable = a.cache() > > > >> val b = cachedTable.select(...) > > > >> val c = a.select(...) > > > >> } > > > >> What is the difference between b and c? At the first glance, I see > two > > > >> options: > > > >> > > > >> Semantic 1. b uses cachedTable as user demanded so. c uses original > > DAG > > > as > > > >> user demanded so. In this case, the optimizer has no chance to > > optimize. > > > >> Semantic 2. b uses cachedTable as user demanded so. c leaves the > > > optimizer > > > >> to choose whether the cache or DAG should be used. In this case, > user > > > lose > > > >> the option to NOT use cache. > > > >> > > > >> As you can see, neither of the options seem perfect. However, I > guess > > > you > > > >> and Till are proposing the third option: > > > >> > > > >> Semantic 3. b leaves the optimizer to choose whether cache or DAG > > should > > > >> be used. c always use the DAG. > > > >> > > > >> This does address all the concerns. It is just that from > intuitiveness > > > >> perspective, I found that asking user to explicitly use a > CachedTable > > > while > > > >> the optimizer might choose to ignore is a little weird. That was > why I > > > did > > > >> not think about that semantic. But given there is material benefit, > I > > > think > > > >> this semantic is acceptable. > > > >> > > > >> 1. If we want to let optimiser make decisions whether to use cache > or > > > not, > > > >>> then why do we need “void cache()” method at all? Would It > > “increase” > > > the > > > >>> chance of using the cache? That’s sounds strange. What would be the > > > >>> mechanism of deciding whether to use the cache or not? If we want > to > > > >>> introduce such kind automated optimisations of “plan nodes > > > deduplication” > > > >>> I would turn it on globally, not per table, and let the optimiser > do > > > all of > > > >>> the work. > > > >>> 2. We do not have statistics at the moment for any use/not use > cache > > > >>> decision. > > > >>> 3. Even if we had, I would be veeerryy sceptical whether such cost > > > based > > > >>> optimisations would work properly and I would still insist first on > > > >>> providing explicit caching mechanism (`CachedTable cache()`) > > > >>> > > > >> We are absolutely on the same page here. An explicit cache() method > is > > > >> necessary not only because optimizer may not be able to make the > right > > > >> decision, but also because of the nature of interactive programming. > > For > > > >> example, if users write the following code in Scala shell: > > > >> val b = a.select(...) > > > >> val c = b.select(...) > > > >> val d = c.select(...).writeToSink(...) > > > >> tEnv.execute() > > > >> There is no way optimizer will know whether b or c will be used in > > later > > > >> code, unless users hint explicitly. > > > >> > > > >> At the same time I’m not sure if you have responded to our > objections > > of > > > >>> `void cache()` being implicit/having side effects, which me, Jark, > > > Fabian, > > > >>> Till and I think also Shaoxuan are supporting. > > > >> > > > >> Is there any other side effects if we use semantic 3 mentioned > above? > > > >> > > > >> Thanks, > > > >> > > > >> JIangjie (Becket) Qin > > > >> > > > >> > > > >> On Mon, Dec 10, 2018 at 7:54 PM Piotr Nowojski < > > pi...@data-artisans.com > > > > > > > >> wrote: > > > >> > > > >>> Hi Becket, > > > >>> > > > >>> Sorry for not responding long time. > > > >>> > > > >>> Regarding case1. > > > >>> > > > >>> There wouldn’t be no “a.unCache()” method, but I would expect only > > > >>> `cachedTableA1.dropCache()`. Dropping `cachedTableA1` wouldn’t > affect > > > >>> `cachedTableA2`. Just as in any other database dropping modifying > one > > > >>> independent table/materialised view does not affect others. > > > >>> > > > >>>> What I meant is that assuming there is already a cached table, > > ideally > > > >>> users need > > > >>>> not to specify whether the next query should read from the cache > or > > > use > > > >>> the > > > >>>> original DAG. This should be decided by the optimizer. > > > >>> > > > >>> 1. If we want to let optimiser make decisions whether to use cache > or > > > >>> not, then why do we need “void cache()” method at all? Would It > > > “increase” > > > >>> the chance of using the cache? That’s sounds strange. What would be > > the > > > >>> mechanism of deciding whether to use the cache or not? If we want > to > > > >>> introduce such kind automated optimisations of “plan nodes > > > deduplication” > > > >>> I would turn it on globally, not per table, and let the optimiser > do > > > all of > > > >>> the work. > > > >>> 2. We do not have statistics at the moment for any use/not use > cache > > > >>> decision. > > > >>> 3. Even if we had, I would be veeerryy sceptical whether such cost > > > based > > > >>> optimisations would work properly and I would still insist first on > > > >>> providing explicit caching mechanism (`CachedTable cache()`) > > > >>> 4. As Till wrote, having explicit `CachedTable cache()` doesn’t > > > >>> contradict future work on automated cost based caching. > > > >>> > > > >>> > > > >>> At the same time I’m not sure if you have responded to our > objections > > > of > > > >>> `void cache()` being implicit/having side effects, which me, Jark, > > > Fabian, > > > >>> Till and I think also Shaoxuan are supporting. > > > >>> > > > >>> Piotrek > > > >>> > > > >>>> On 5 Dec 2018, at 12:42, Becket Qin <becket....@gmail.com> wrote: > > > >>>> > > > >>>> Hi Till, > > > >>>> > > > >>>> It is true that after the first job submission, there will be no > > > >>> ambiguity > > > >>>> in terms of whether a cached table is used or not. That is the > same > > > for > > > >>> the > > > >>>> cache() without returning a CachedTable. > > > >>>> > > > >>>> Conceptually one could think of cache() as introducing a caching > > > >>> operator > > > >>>>> from which you need to consume from if you want to benefit from > the > > > >>> caching > > > >>>>> functionality. > > > >>>> > > > >>>> I am thinking a little differently. I think it is a hint (as you > > > >>> mentioned > > > >>>> later) instead of a new operator. I'd like to be careful about the > > > >>> semantic > > > >>>> of the API. A hint is a property set on an existing operator, but > is > > > not > > > >>>> itself an operator as it does not really manipulate the data. > > > >>>> > > > >>>> I agree, ideally the optimizer makes this kind of decision which > > > >>>>> intermediate result should be cached. But especially when > executing > > > >>> ad-hoc > > > >>>>> queries the user might better know which results need to be > cached > > > >>> because > > > >>>>> Flink might not see the full DAG. In that sense, I would consider > > the > > > >>>>> cache() method as a hint for the optimizer. Of course, in the > > future > > > we > > > >>>>> might add functionality which tries to automatically cache > results > > > >>> (e.g. > > > >>>>> caching the latest intermediate results until so and so much > space > > is > > > >>>>> used). But this should hopefully not contradict with `CachedTable > > > >>> cache()`. > > > >>>> > > > >>>> I agree that cache() method is needed for exactly the reason you > > > >>> mentioned, > > > >>>> i.e. Flink cannot predict what users are going to write later, so > > > users > > > >>>> need to tell Flink explicitly that this table will be used later. > > > What I > > > >>>> meant is that assuming there is already a cached table, ideally > > users > > > >>> need > > > >>>> not to specify whether the next query should read from the cache > or > > > use > > > >>> the > > > >>>> original DAG. This should be decided by the optimizer. > > > >>>> > > > >>>> To explain the difference between returning / not returning a > > > >>> CachedTable, > > > >>>> I want compare the following two case: > > > >>>> > > > >>>> *Case 1: returning a CachedTable* > > > >>>> b = a.map(...) > > > >>>> val cachedTableA1 = a.cache() > > > >>>> val cachedTableA2 = a.cache() > > > >>>> b.print() // Just to make sure a is cached. > > > >>>> > > > >>>> c = a.filter(...) // User specify that the original DAG is used? > Or > > > the > > > >>>> optimizer decides whether DAG or cache should be used? > > > >>>> d = cachedTableA1.filter() // User specify that the cached table > is > > > >>> used. > > > >>>> > > > >>>> a.unCache() // Can cachedTableA still be used afterwards? > > > >>>> cachedTableA1.uncache() // Can cachedTableA2 still be used? > > > >>>> > > > >>>> *Case 2: not returning a CachedTable* > > > >>>> b = a.map() > > > >>>> a.cache() > > > >>>> a.cache() // no-op > > > >>>> b.print() // Just to make sure a is cached > > > >>>> > > > >>>> c = a.filter(...) // Optimizer decides whether the cache or DAG > > should > > > >>> be > > > >>>> used > > > >>>> d = a.filter(...) // Optimizer decides whether the cache or DAG > > should > > > >>> be > > > >>>> used > > > >>>> > > > >>>> a.unCache() > > > >>>> a.unCache() // no-op > > > >>>> > > > >>>> In case 1, semantic wise, optimizer lose the option to choose > > between > > > >>> DAG > > > >>>> and cache. And the unCache() call becomes tricky. > > > >>>> In case 2, users do not need to worry about whether cache or DAG > is > > > >>> used. > > > >>>> And the unCache() semantic is clear. However, the caveat is that > > users > > > >>>> cannot explicitly ignore the cache. > > > >>>> > > > >>>> In order to address the issues mentioned in case 2 and inspired by > > the > > > >>>> discussion so far, I am thinking about using hint to allow user > > > >>> explicitly > > > >>>> ignore cache. Although we do not have hint yet, but we probably > > should > > > >>> have > > > >>>> one. So the code becomes: > > > >>>> > > > >>>> *Case 3: returning this table* > > > >>>> b = a.map() > > > >>>> a.cache() > > > >>>> a.cache() // no-op > > > >>>> b.print() // Just to make sure a is cached > > > >>>> > > > >>>> c = a.filter(...) // Optimizer decides whether the cache or DAG > > should > > > >>> be > > > >>>> used > > > >>>> d = a.hint("ignoreCache").filter(...) // DAG will be used instead > of > > > the > > > >>>> cache. > > > >>>> > > > >>>> a.unCache() > > > >>>> a.unCache() // no-op > > > >>>> > > > >>>> We could also let cache() return this table to allow chained > method > > > >>> calls. > > > >>>> Do you think this API addresses the concerns? > > > >>>> > > > >>>> Thanks, > > > >>>> > > > >>>> Jiangjie (Becket) Qin > > > >>>> > > > >>>> > > > >>>> On Wed, Dec 5, 2018 at 10:55 AM Jark Wu <imj...@gmail.com> wrote: > > > >>>> > > > >>>>> Hi, > > > >>>>> > > > >>>>> All the recent discussions are focused on whether there is a > > problem > > > if > > > >>>>> cache() not return a Table. > > > >>>>> It seems that returning a Table explicitly is more clear (and > > safe?). > > > >>>>> > > > >>>>> So whether there are any problems if cache() returns a Table? > > > @Becket > > > >>>>> > > > >>>>> Best, > > > >>>>> Jark > > > >>>>> > > > >>>>> On Tue, 4 Dec 2018 at 22:27, Till Rohrmann <trohrm...@apache.org > > > > > >>> wrote: > > > >>>>> > > > >>>>>> It's true that b, c, d and e will all read from the original DAG > > > that > > > >>>>>> generates a. But all subsequent operators (when running multiple > > > >>> queries) > > > >>>>>> which reference cachedTableA should not need to reproduce `a` > but > > > >>>>> directly > > > >>>>>> consume the intermediate result. > > > >>>>>> > > > >>>>>> Conceptually one could think of cache() as introducing a caching > > > >>> operator > > > >>>>>> from which you need to consume from if you want to benefit from > > the > > > >>>>> caching > > > >>>>>> functionality. > > > >>>>>> > > > >>>>>> I agree, ideally the optimizer makes this kind of decision which > > > >>>>>> intermediate result should be cached. But especially when > > executing > > > >>>>> ad-hoc > > > >>>>>> queries the user might better know which results need to be > cached > > > >>>>> because > > > >>>>>> Flink might not see the full DAG. In that sense, I would > consider > > > the > > > >>>>>> cache() method as a hint for the optimizer. Of course, in the > > future > > > >>> we > > > >>>>>> might add functionality which tries to automatically cache > results > > > >>> (e.g. > > > >>>>>> caching the latest intermediate results until so and so much > space > > > is > > > >>>>>> used). But this should hopefully not contradict with > `CachedTable > > > >>>>> cache()`. > > > >>>>>> > > > >>>>>> Cheers, > > > >>>>>> Till > > > >>>>>> > > > >>>>>> On Tue, Dec 4, 2018 at 2:33 PM Becket Qin <becket....@gmail.com > > > > > >>> wrote: > > > >>>>>> > > > >>>>>>> Hi Till, > > > >>>>>>> > > > >>>>>>> Thanks for the clarification. I am still a little confused. > > > >>>>>>> > > > >>>>>>> If cache() returns a CachedTable, the example might become: > > > >>>>>>> > > > >>>>>>> b = a.map(...) > > > >>>>>>> c = a.map(...) > > > >>>>>>> > > > >>>>>>> cachedTableA = a.cache() > > > >>>>>>> d = cachedTableA.map(...) > > > >>>>>>> e = a.map() > > > >>>>>>> > > > >>>>>>> In the above case, if cache() is lazily evaluated, b, c, d and > e > > > are > > > >>>>> all > > > >>>>>>> going to be reading from the original DAG that generates a. But > > > with > > > >>> a > > > >>>>>>> naive expectation, d should be reading from the cache. This > seems > > > not > > > >>>>>>> solving the potential confusion you raised, right? > > > >>>>>>> > > > >>>>>>> Just to be clear, my understanding are all based on the > > assumption > > > >>> that > > > >>>>>> the > > > >>>>>>> tables are immutable. Therefore, after a.cache(), a the > > > >>> c*achedTableA* > > > >>>>>> and > > > >>>>>>> original table *a * should be completely interchangeable. > > > >>>>>>> > > > >>>>>>> That said, I think a valid argument is optimization. There are > > > indeed > > > >>>>>> cases > > > >>>>>>> that reading from the original DAG could be faster than reading > > > from > > > >>>>> the > > > >>>>>>> cache. For example, in the following example: > > > >>>>>>> > > > >>>>>>> a.filter(f1' > 100) > > > >>>>>>> a.cache() > > > >>>>>>> b = a.filter(f1' < 100) > > > >>>>>>> > > > >>>>>>> Ideally the optimizer should be intelligent enough to decide > > which > > > >>> way > > > >>>>> is > > > >>>>>>> faster, without user intervention. In this case, it will > identify > > > >>> that > > > >>>>> b > > > >>>>>>> would just be an empty table, thus skip reading from the cache > > > >>>>>> completely. > > > >>>>>>> But I agree that returning a CachedTable would give user the > > > control > > > >>> of > > > >>>>>>> when to use cache, even though I still feel that letting the > > > >>> optimizer > > > >>>>>>> handle this is a better option in long run. > > > >>>>>>> > > > >>>>>>> Thanks, > > > >>>>>>> > > > >>>>>>> Jiangjie (Becket) Qin > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> On Tue, Dec 4, 2018 at 6:51 PM Till Rohrmann < > > trohrm...@apache.org > > > > > > > >>>>>> wrote: > > > >>>>>>> > > > >>>>>>>> Yes you are right Becket that it still depends on the actual > > > >>>>> execution > > > >>>>>> of > > > >>>>>>>> the job whether a consumer reads from a cached result or not. > > > >>>>>>>> > > > >>>>>>>> My point was actually about the properties of a (cached vs. > > > >>>>> non-cached) > > > >>>>>>> and > > > >>>>>>>> not about the execution. I would not make cache trigger the > > > >>> execution > > > >>>>>> of > > > >>>>>>>> the job because one loses some flexibility by eagerly > triggering > > > the > > > >>>>>>>> execution. > > > >>>>>>>> > > > >>>>>>>> I tried to argue for an explicit CachedTable which is returned > > by > > > >>> the > > > >>>>>>>> cache() method like Piotr did in order to make the API more > > > >>> explicit. > > > >>>>>>>> > > > >>>>>>>> Cheers, > > > >>>>>>>> Till > > > >>>>>>>> > > > >>>>>>>> On Mon, Dec 3, 2018 at 4:23 PM Becket Qin < > becket....@gmail.com > > > > > > >>>>>> wrote: > > > >>>>>>>> > > > >>>>>>>>> Hi Till, > > > >>>>>>>>> > > > >>>>>>>>> That is a good example. Just a minor correction, in this > case, > > > b, c > > > >>>>>>> and d > > > >>>>>>>>> will all consume from a non-cached a. This is because cache > > will > > > >>>>> only > > > >>>>>>> be > > > >>>>>>>>> created on the very first job submission that generates the > > table > > > >>>>> to > > > >>>>>> be > > > >>>>>>>>> cached. > > > >>>>>>>>> > > > >>>>>>>>> If I understand correctly, this is example is about whether > > > >>>>> .cache() > > > >>>>>>>> method > > > >>>>>>>>> should be eagerly evaluated or lazily evaluated. In another > > word, > > > >>>>> if > > > >>>>>>>>> cache() method actually triggers a job that creates the > cache, > > > >>>>> there > > > >>>>>>> will > > > >>>>>>>>> be no such confusion. Is that right? > > > >>>>>>>>> > > > >>>>>>>>> In the example, although d will not consume from the cached > > Table > > > >>>>>> while > > > >>>>>>>> it > > > >>>>>>>>> looks supposed to, from correctness perspective the code will > > > still > > > >>>>>>>> return > > > >>>>>>>>> correct result, assuming that tables are immutable. > > > >>>>>>>>> > > > >>>>>>>>> Personally I feel it is OK because users probably won't > really > > > >>>>> worry > > > >>>>>>>> about > > > >>>>>>>>> whether the table is cached or not. And lazy cache could > avoid > > > some > > > >>>>>>>>> unnecessary caching if a cached table is never created in the > > > user > > > >>>>>>>>> application. But I am not opposed to do eager evaluation of > > > cache. > > > >>>>>>>>> > > > >>>>>>>>> Thanks, > > > >>>>>>>>> > > > >>>>>>>>> Jiangjie (Becket) Qin > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> On Mon, Dec 3, 2018 at 10:01 PM Till Rohrmann < > > > >>>>> trohrm...@apache.org> > > > >>>>>>>>> wrote: > > > >>>>>>>>> > > > >>>>>>>>>> Another argument for Piotr's point is that lazily changing > > > >>>>>> properties > > > >>>>>>>> of > > > >>>>>>>>> a > > > >>>>>>>>>> node affects all down stream consumers but does not > > necessarily > > > >>>>>> have > > > >>>>>>> to > > > >>>>>>>>>> happen before these consumers are defined. From a user's > > > >>>>>> perspective > > > >>>>>>>> this > > > >>>>>>>>>> can be quite confusing: > > > >>>>>>>>>> > > > >>>>>>>>>> b = a.map(...) > > > >>>>>>>>>> c = a.map(...) > > > >>>>>>>>>> > > > >>>>>>>>>> a.cache() > > > >>>>>>>>>> d = a.map(...) > > > >>>>>>>>>> > > > >>>>>>>>>> now b, c and d will consume from a cached operator. In this > > > case, > > > >>>>>> the > > > >>>>>>>>> user > > > >>>>>>>>>> would most likely expect that only d reads from a cached > > result. > > > >>>>>>>>>> > > > >>>>>>>>>> Cheers, > > > >>>>>>>>>> Till > > > >>>>>>>>>> > > > >>>>>>>>>> On Mon, Dec 3, 2018 at 11:32 AM Piotr Nowojski < > > > >>>>>>>> pi...@data-artisans.com> > > > >>>>>>>>>> wrote: > > > >>>>>>>>>> > > > >>>>>>>>>>> Hey Shaoxuan and Becket, > > > >>>>>>>>>>> > > > >>>>>>>>>>>> Can you explain a bit more one what are the side effects? > So > > > >>>>>> far > > > >>>>>>> my > > > >>>>>>>>>>>> understanding is that such side effects only exist if a > > table > > > >>>>>> is > > > >>>>>>>>>> mutable. > > > >>>>>>>>>>>> Is that the case? > > > >>>>>>>>>>> > > > >>>>>>>>>>> Not only that. There are also performance implications and > > > >>>>> those > > > >>>>>>> are > > > >>>>>>>>>>> another implicit side effects of using `void cache()`. As I > > > >>>>> wrote > > > >>>>>>>>> before, > > > >>>>>>>>>>> reading from cache might not always be desirable, thus it > can > > > >>>>>> cause > > > >>>>>>>>>>> performance degradation and I’m fine with that - user's or > > > >>>>>>>> optimiser’s > > > >>>>>>>>>>> choice. What I do not like is that this implicit side > effect > > > >>>>> can > > > >>>>>>>>> manifest > > > >>>>>>>>>>> in completely different part of code, that wasn’t touched > by > > a > > > >>>>>> user > > > >>>>>>>>> while > > > >>>>>>>>>>> he was adding `void cache()` call somewhere else. And even > if > > > >>>>>>> caching > > > >>>>>>>>>>> improves performance, it’s still a side effect of `void > > > >>>>> cache()`. > > > >>>>>>>>> Almost > > > >>>>>>>>>>> from the definition `void` methods have only side effects. > > As I > > > >>>>>>> wrote > > > >>>>>>>>>>> before, there are couple of scenarios where this might be > > > >>>>>>> undesirable > > > >>>>>>>>>>> and/or unexpected, for example: > > > >>>>>>>>>>> > > > >>>>>>>>>>> 1. > > > >>>>>>>>>>> Table b = …; > > > >>>>>>>>>>> b.cache() > > > >>>>>>>>>>> x = b.join(…) > > > >>>>>>>>>>> y = b.count() > > > >>>>>>>>>>> // ... > > > >>>>>>>>>>> // 100 > > > >>>>>>>>>>> // hundred > > > >>>>>>>>>>> // lines > > > >>>>>>>>>>> // of > > > >>>>>>>>>>> // code > > > >>>>>>>>>>> // later > > > >>>>>>>>>>> z = b.filter(…).groupBy(…) // this might be even hidden in > a > > > >>>>>>>> different > > > >>>>>>>>>>> method/file/package/dependency > > > >>>>>>>>>>> > > > >>>>>>>>>>> 2. > > > >>>>>>>>>>> > > > >>>>>>>>>>> Table b = ... > > > >>>>>>>>>>> If (some_condition) { > > > >>>>>>>>>>> foo(b) > > > >>>>>>>>>>> } > > > >>>>>>>>>>> Else { > > > >>>>>>>>>>> bar(b) > > > >>>>>>>>>>> } > > > >>>>>>>>>>> z = b.filter(…).groupBy(…) > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> Void foo(Table b) { > > > >>>>>>>>>>> b.cache() > > > >>>>>>>>>>> // do something with b > > > >>>>>>>>>>> } > > > >>>>>>>>>>> > > > >>>>>>>>>>> In both above examples, `b.cache()` will implicitly affect > > > >>>>>>> (semantic > > > >>>>>>>>> of a > > > >>>>>>>>>>> program in case of sources being mutable and performance) > `z > > = > > > >>>>>>>>>>> b.filter(…).groupBy(…)` which might be far from obvious. > > > >>>>>>>>>>> > > > >>>>>>>>>>> On top of that, there is still this argument of mine that > > > >>>>> having > > > >>>>>> a > > > >>>>>>>>>>> `MaterializedTable` or `CachedTable` handle is more > flexible > > > >>>>> for > > > >>>>>> us > > > >>>>>>>> for > > > >>>>>>>>>> the > > > >>>>>>>>>>> future and for the user (as a manual option to bypass cache > > > >>>>>> reads). > > > >>>>>>>>>>> > > > >>>>>>>>>>>> But Jiangjie is correct, > > > >>>>>>>>>>>> the source table in batching should be immutable. It is > the > > > >>>>>>> user’s > > > >>>>>>>>>>>> responsibility to ensure it, otherwise even a regular > > > >>>>> failover > > > >>>>>>> may > > > >>>>>>>>> lead > > > >>>>>>>>>>>> to inconsistent results. > > > >>>>>>>>>>> > > > >>>>>>>>>>> Yes, I agree that’s what perfect world/good deployment > should > > > >>>>> be. > > > >>>>>>> But > > > >>>>>>>>> its > > > >>>>>>>>>>> often isn’t and while I’m not trying to fix this (since the > > > >>>>>> proper > > > >>>>>>>> fix > > > >>>>>>>>> is > > > >>>>>>>>>>> to support transactions), I’m just trying to minimise > > confusion > > > >>>>>> for > > > >>>>>>>> the > > > >>>>>>>>>>> users that are not fully aware what’s going on and operate > in > > > >>>>>> less > > > >>>>>>>> then > > > >>>>>>>>>>> perfect setup. And if something bites them after adding > > > >>>>>> `b.cache()` > > > >>>>>>>>> call, > > > >>>>>>>>>>> to make sure that they at least know all of the places that > > > >>>>>> adding > > > >>>>>>>> this > > > >>>>>>>>>>> line can affect. > > > >>>>>>>>>>> > > > >>>>>>>>>>> Thanks, Piotrek > > > >>>>>>>>>>> > > > >>>>>>>>>>>> On 1 Dec 2018, at 15:39, Becket Qin <becket....@gmail.com > > > > > >>>>>>> wrote: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Hi Piotrek, > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Thanks again for the clarification. Some more replies are > > > >>>>>>>> following. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> But keep in mind that `.cache()` will/might not only be > used > > > >>>>> in > > > >>>>>>>>>>> interactive > > > >>>>>>>>>>>>> programming and not only in batching. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> It is true. Actually in stream processing, cache() has the > > > >>>>> same > > > >>>>>>>>>> semantic > > > >>>>>>>>>>> as > > > >>>>>>>>>>>> batch processing. The semantic is following: > > > >>>>>>>>>>>> For a table created via a series of computation, save that > > > >>>>>> table > > > >>>>>>>> for > > > >>>>>>>>>>> later > > > >>>>>>>>>>>> reference to avoid running the computation logic to > > > >>>>> regenerate > > > >>>>>>> the > > > >>>>>>>>>> table. > > > >>>>>>>>>>>> Once the application exits, drop all the cache. > > > >>>>>>>>>>>> This semantic is same for both batch and stream > processing. > > > >>>>> The > > > >>>>>>>>>>> difference > > > >>>>>>>>>>>> is that stream applications will only run once as they are > > > >>>>> long > > > >>>>>>>>>> running. > > > >>>>>>>>>>>> And the batch applications may be run multiple times, > hence > > > >>>>> the > > > >>>>>>>> cache > > > >>>>>>>>>> may > > > >>>>>>>>>>>> be created and dropped each time the application runs. > > > >>>>>>>>>>>> Admittedly, there will probably be some resource > management > > > >>>>>>>>>> requirements > > > >>>>>>>>>>>> for the streaming cached table, such as time based / size > > > >>>>> based > > > >>>>>>>>>>> retention, > > > >>>>>>>>>>>> to address the infinite data issue. But such requirement > > does > > > >>>>>> not > > > >>>>>>>>>> change > > > >>>>>>>>>>>> the semantic. > > > >>>>>>>>>>>> You are right that interactive programming is just one use > > > >>>>> case > > > >>>>>>> of > > > >>>>>>>>>>> cache(). > > > >>>>>>>>>>>> It is not the only use case. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> For me the more important issue is of not having the `void > > > >>>>>>> cache()` > > > >>>>>>>>>> with > > > >>>>>>>>>>>>> side effects. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> This is indeed the key point. The argument around whether > > > >>>>>> cache() > > > >>>>>>>>>> should > > > >>>>>>>>>>>> return something already indicates that cache() and > > > >>>>>> materialize() > > > >>>>>>>>>> address > > > >>>>>>>>>>>> different issues. > > > >>>>>>>>>>>> Can you explain a bit more one what are the side effects? > So > > > >>>>>> far > > > >>>>>>> my > > > >>>>>>>>>>>> understanding is that such side effects only exist if a > > table > > > >>>>>> is > > > >>>>>>>>>> mutable. > > > >>>>>>>>>>>> Is that the case? > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> I don’t know, probably initially we should make > CachedTable > > > >>>>>>>>> read-only. > > > >>>>>>>>>> I > > > >>>>>>>>>>>>> don’t find it more confusing than the fact that user can > > not > > > >>>>>>> write > > > >>>>>>>>> to > > > >>>>>>>>>>> views > > > >>>>>>>>>>>>> or materialised views in SQL or that user currently can > not > > > >>>>>>> write > > > >>>>>>>>> to a > > > >>>>>>>>>>>>> Table. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> I don't think anyone should insert something to a cache. > By > > > >>>>>>>>> definition > > > >>>>>>>>>>> the > > > >>>>>>>>>>>> cache should only be updated when the corresponding > original > > > >>>>>>> table > > > >>>>>>>> is > > > >>>>>>>>>>>> updated. What I am wondering is that given the following > two > > > >>>>>>> facts: > > > >>>>>>>>>>>> 1. If and only if a table is mutable (with something like > > > >>>>>>>> insert()), > > > >>>>>>>>> a > > > >>>>>>>>>>>> CachedTable may have implicit behavior. > > > >>>>>>>>>>>> 2. A CachedTable extends a Table. > > > >>>>>>>>>>>> We can come to the conclusion that a CachedTable is > mutable > > > >>>>> and > > > >>>>>>>> users > > > >>>>>>>>>> can > > > >>>>>>>>>>>> insert into the CachedTable directly. This is where I > > thought > > > >>>>>>>>>> confusing. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Thanks, > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Jiangjie (Becket) Qin > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> On Sat, Dec 1, 2018 at 2:45 AM Piotr Nowojski < > > > >>>>>>>>> pi...@data-artisans.com > > > >>>>>>>>>>> > > > >>>>>>>>>>>> wrote: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>>> Hi all, > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Regarding naming `cache()` vs `materialize()`. One more > > > >>>>>>>> explanation > > > >>>>>>>>>> why > > > >>>>>>>>>>> I > > > >>>>>>>>>>>>> think `materialize()` is more natural to me is that I > think > > > >>>>> of > > > >>>>>>> all > > > >>>>>>>>>>> “Table”s > > > >>>>>>>>>>>>> in Table-API as views. They behave the same way as SQL > > > >>>>> views, > > > >>>>>>> the > > > >>>>>>>>> only > > > >>>>>>>>>>>>> difference for me is that their live scope is short - > > > >>>>> current > > > >>>>>>>>> session > > > >>>>>>>>>>> which > > > >>>>>>>>>>>>> is limited by different execution model. That’s why > > > >>>>> “cashing” > > > >>>>>> a > > > >>>>>>>> view > > > >>>>>>>>>>> for me > > > >>>>>>>>>>>>> is just materialising it. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> However I see and I understand your point of view. Coming > > > >>>>> from > > > >>>>>>>>>>>>> DataSet/DataStream and generally speaking non-SQL world, > > > >>>>>>> `cache()` > > > >>>>>>>>> is > > > >>>>>>>>>>> more > > > >>>>>>>>>>>>> natural. But keep in mind that `.cache()` will/might not > > > >>>>> only > > > >>>>>> be > > > >>>>>>>>> used > > > >>>>>>>>>> in > > > >>>>>>>>>>>>> interactive programming and not only in batching. But > > naming > > > >>>>>> is > > > >>>>>>>> one > > > >>>>>>>>>>> issue, > > > >>>>>>>>>>>>> and not that critical to me. Especially that once we > > > >>>>> implement > > > >>>>>>>>> proper > > > >>>>>>>>>>>>> materialised views, we can always deprecate/rename > > `cache()` > > > >>>>>> if > > > >>>>>>> we > > > >>>>>>>>>> deem > > > >>>>>>>>>>> so. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> For me the more important issue is of not having the > `void > > > >>>>>>>> cache()` > > > >>>>>>>>>> with > > > >>>>>>>>>>>>> side effects. Exactly for the reasons that you have > > > >>>>> mentioned. > > > >>>>>>>> True: > > > >>>>>>>>>>>>> results might be non deterministic if underlying source > > > >>>>> table > > > >>>>>>> are > > > >>>>>>>>>>> changing. > > > >>>>>>>>>>>>> Problem is that `void cache()` implicitly changes the > > > >>>>> semantic > > > >>>>>>> of > > > >>>>>>>>>>>>> subsequent uses of the cached/materialized Table. It can > > > >>>>> cause > > > >>>>>>>> “wtf” > > > >>>>>>>>>>> moment > > > >>>>>>>>>>>>> for a user if he inserts “b.cache()” call in some place > in > > > >>>>> his > > > >>>>>>>> code > > > >>>>>>>>>> and > > > >>>>>>>>>>>>> suddenly some other random places are behaving > differently. > > > >>>>> If > > > >>>>>>>>>>>>> `materialize()` or `cache()` returns a Table handle, we > > > >>>>> force > > > >>>>>>> user > > > >>>>>>>>> to > > > >>>>>>>>>>>>> explicitly use the cache which removes the “random” part > > > >>>>> from > > > >>>>>>> the > > > >>>>>>>>>>> "suddenly > > > >>>>>>>>>>>>> some other random places are behaving differently”. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> This argument and others that I’ve raised (greater > > > >>>>>>>>>> flexibility/allowing > > > >>>>>>>>>>>>> user to explicitly bypass the cache) are independent of > > > >>>>>>> `cache()` > > > >>>>>>>> vs > > > >>>>>>>>>>>>> `materialize()` discussion. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Does that mean one can also insert into the CachedTable? > > > >>>>> This > > > >>>>>>>>> sounds > > > >>>>>>>>>>>>> pretty confusing. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> I don’t know, probably initially we should make > CachedTable > > > >>>>>>>>>> read-only. I > > > >>>>>>>>>>>>> don’t find it more confusing than the fact that user can > > not > > > >>>>>>> write > > > >>>>>>>>> to > > > >>>>>>>>>>> views > > > >>>>>>>>>>>>> or materialised views in SQL or that user currently can > not > > > >>>>>>> write > > > >>>>>>>>> to a > > > >>>>>>>>>>>>> Table. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Piotrek > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>>> On 30 Nov 2018, at 17:38, Xingcan Cui < > xingc...@gmail.com > > > > > > >>>>>>>> wrote: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Hi all, > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> I agree with @Becket that `cache()` and `materialize()` > > > >>>>>> should > > > >>>>>>> be > > > >>>>>>>>>>>>> considered as two different methods where the later one > is > > > >>>>>> more > > > >>>>>>>>>>>>> sophisticated. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> According to my understanding, the initial idea is just > to > > > >>>>>>>>> introduce > > > >>>>>>>>>> a > > > >>>>>>>>>>>>> simple cache or persist mechanism, but as the TableAPI > is a > > > >>>>>>>>> high-level > > > >>>>>>>>>>> API, > > > >>>>>>>>>>>>> it’s naturally for as to think in a SQL way. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Maybe we can add the `cache()` method to the DataSet API > > > >>>>> and > > > >>>>>>>> force > > > >>>>>>>>>>> users > > > >>>>>>>>>>>>> to translate a Table to a Dataset before caching it. Then > > > >>>>> the > > > >>>>>>>> users > > > >>>>>>>>>>> should > > > >>>>>>>>>>>>> manually register the cached dataset to a table again (we > > > >>>>> may > > > >>>>>>> need > > > >>>>>>>>>> some > > > >>>>>>>>>>>>> table replacement mechanisms for datasets with an > identical > > > >>>>>>> schema > > > >>>>>>>>> but > > > >>>>>>>>>>>>> different contents here). After all, it’s the dataset > > rather > > > >>>>>>> than > > > >>>>>>>>> the > > > >>>>>>>>>>>>> dynamic table that need to be cached, right? > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>> Xingcan > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> On Nov 30, 2018, at 10:57 AM, Becket Qin < > > > >>>>>>> becket....@gmail.com> > > > >>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Hi Piotrek and Jark, > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Thanks for the feedback and explanation. Those are good > > > >>>>>>>> arguments. > > > >>>>>>>>>>> But I > > > >>>>>>>>>>>>>>> think those arguments are mostly about materialized > view. > > > >>>>>> Let > > > >>>>>>> me > > > >>>>>>>>> try > > > >>>>>>>>>>> to > > > >>>>>>>>>>>>>>> explain the reason I believe cache() and materialize() > > are > > > >>>>>>>>>> different. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> I think cache() and materialize() have quite different > > > >>>>>>>>> implications. > > > >>>>>>>>>>> An > > > >>>>>>>>>>>>>>> analogy I can think of is save()/publish(). When users > > > >>>>> call > > > >>>>>>>>> cache(), > > > >>>>>>>>>>> it > > > >>>>>>>>>>>>> is > > > >>>>>>>>>>>>>>> just like they are saving an intermediate result as a > > > >>>>> draft > > > >>>>>> of > > > >>>>>>>>> their > > > >>>>>>>>>>>>> work, > > > >>>>>>>>>>>>>>> this intermediate result may not have any realistic > > > >>>>> meaning. > > > >>>>>>>>> Calling > > > >>>>>>>>>>>>>>> cache() does not mean users want to publish the cached > > > >>>>> table > > > >>>>>>> in > > > >>>>>>>>> any > > > >>>>>>>>>>>>> manner. > > > >>>>>>>>>>>>>>> But when users call materialize(), that means "I have > > > >>>>>>> something > > > >>>>>>>>>>>>> meaningful > > > >>>>>>>>>>>>>>> to be reused by others", now users need to think about > > the > > > >>>>>>>>>> validation, > > > >>>>>>>>>>>>>>> update & versioning, lifecycle of the result, etc. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Piotrek's suggestions on variations of the > materialize() > > > >>>>>>> methods > > > >>>>>>>>> are > > > >>>>>>>>>>>>> very > > > >>>>>>>>>>>>>>> useful. It would be great if Flink have them. The > concept > > > >>>>> of > > > >>>>>>>>>>>>> materialized > > > >>>>>>>>>>>>>>> view is actually a pretty big feature, not to say the > > > >>>>>> related > > > >>>>>>>>> stuff > > > >>>>>>>>>>> like > > > >>>>>>>>>>>>>>> triggers/hooks you mentioned earlier. I think the > > > >>>>>> materialized > > > >>>>>>>>> view > > > >>>>>>>>>>>>> itself > > > >>>>>>>>>>>>>>> should be discussed in a more thorough and systematic > > > >>>>>> manner. > > > >>>>>>>> And > > > >>>>>>>>> I > > > >>>>>>>>>>>>> found > > > >>>>>>>>>>>>>>> that discussion is kind of orthogonal and way beyond > > > >>>>>>> interactive > > > >>>>>>>>>>>>>>> programming experience. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> The example you gave was interesting. I still have some > > > >>>>>>>> questions, > > > >>>>>>>>>>>>> though. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Table source = … // some source that scans files from a > > > >>>>>>>> directory > > > >>>>>>>>>>>>>>>> “/foo/bar/“ > > > >>>>>>>>>>>>>>>> Table t1 = source.groupBy(…).select(…).where(…) ….; > > > >>>>>>>>>>>>>>>> Table t2 = t1.materialize() // (or `cache()`) > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> t2.count() // initialise cache (if it’s lazily > > > >>>>> initialised) > > > >>>>>>>>>>>>>>>> int a1 = t1.count() > > > >>>>>>>>>>>>>>>> int b1 = t2.count() > > > >>>>>>>>>>>>>>>> // something in the background (or we trigger it) > writes > > > >>>>>> new > > > >>>>>>>>> files > > > >>>>>>>>>> to > > > >>>>>>>>>>>>>>>> /foo/bar > > > >>>>>>>>>>>>>>>> int a2 = t1.count() > > > >>>>>>>>>>>>>>>> int b2 = t2.count() > > > >>>>>>>>>>>>>>>> t2.refresh() // possible future extension, not to be > > > >>>>>>>> implemented > > > >>>>>>>>> in > > > >>>>>>>>>>> the > > > >>>>>>>>>>>>>>>> initial version > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> what if someone else added some more files to /foo/bar > at > > > >>>>>> this > > > >>>>>>>>>> point? > > > >>>>>>>>>>> In > > > >>>>>>>>>>>>>>> that case, a3 won't equals to b3, and the result become > > > >>>>>>>>>>>>> non-deterministic, > > > >>>>>>>>>>>>>>> right? > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> int a3 = t1.count() > > > >>>>>>>>>>>>>>>> int b3 = t2.count() > > > >>>>>>>>>>>>>>>> t2.drop() // another possible future extension, manual > > > >>>>>>> “cache” > > > >>>>>>>>>>> dropping > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> When we talk about interactive programming, in most > > cases, > > > >>>>>> we > > > >>>>>>>> are > > > >>>>>>>>>>>>> talking > > > >>>>>>>>>>>>>>> about batch applications. A fundamental assumption of > > such > > > >>>>>>> case > > > >>>>>>>> is > > > >>>>>>>>>>> that > > > >>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>> source data is complete before the data processing > > begins, > > > >>>>>> and > > > >>>>>>>> the > > > >>>>>>>>>>> data > > > >>>>>>>>>>>>>>> will not change during the data processing. IMO, if > > > >>>>>> additional > > > >>>>>>>>> rows > > > >>>>>>>>>>>>> needs > > > >>>>>>>>>>>>>>> to be added to some source during the processing, it > > > >>>>> should > > > >>>>>> be > > > >>>>>>>>> done > > > >>>>>>>>>> in > > > >>>>>>>>>>>>> ways > > > >>>>>>>>>>>>>>> like union the source with another table containing the > > > >>>>> rows > > > >>>>>>> to > > > >>>>>>>> be > > > >>>>>>>>>>>>> added. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> There are a few cases that computations are executed > > > >>>>>>> repeatedly > > > >>>>>>>> on > > > >>>>>>>>>> the > > > >>>>>>>>>>>>>>> changing data source. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> For example, people may run a ML training job every > hour > > > >>>>>> with > > > >>>>>>>> the > > > >>>>>>>>>>>>> samples > > > >>>>>>>>>>>>>>> newly added in the past hour. In that case, the source > > > >>>>> data > > > >>>>>>>>> between > > > >>>>>>>>>>> will > > > >>>>>>>>>>>>>>> indeed change. But still, the data remain unchanged > > within > > > >>>>>> one > > > >>>>>>>>> run. > > > >>>>>>>>>>> And > > > >>>>>>>>>>>>>>> usually in that case, the result will need versioning, > > > >>>>> i.e. > > > >>>>>>> for > > > >>>>>>>> a > > > >>>>>>>>>>> given > > > >>>>>>>>>>>>>>> result, it tells that the result is a result from the > > > >>>>> source > > > >>>>>>>> data > > > >>>>>>>>>> by a > > > >>>>>>>>>>>>>>> certain timestamp. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Another example is something like data warehouse. In > this > > > >>>>>>> case, > > > >>>>>>>>>> there > > > >>>>>>>>>>>>> are a > > > >>>>>>>>>>>>>>> few source of original/raw data. On top of those > sources, > > > >>>>>> many > > > >>>>>>>>>>>>> materialized > > > >>>>>>>>>>>>>>> view / queries / reports / dashboards can be created to > > > >>>>>>> generate > > > >>>>>>>>>>> derived > > > >>>>>>>>>>>>>>> data. Those derived data needs to be updated when the > > > >>>>>>> underlying > > > >>>>>>>>>>>>> original > > > >>>>>>>>>>>>>>> data changes. In that case, the processing logic that > > > >>>>>> derives > > > >>>>>>>> the > > > >>>>>>>>>>>>> original > > > >>>>>>>>>>>>>>> data needs to be executed repeatedly to update those > > > >>>>>>>>> reports/views. > > > >>>>>>>>>>>>> Again, > > > >>>>>>>>>>>>>>> all those derived data also need to have version > > > >>>>> management, > > > >>>>>>>> such > > > >>>>>>>>> as > > > >>>>>>>>>>>>>>> timestamp. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> In any of the above two cases, during a single run of > the > > > >>>>>>>>> processing > > > >>>>>>>>>>>>> logic, > > > >>>>>>>>>>>>>>> the data cannot change. Otherwise the behavior of the > > > >>>>>>> processing > > > >>>>>>>>>> logic > > > >>>>>>>>>>>>> may > > > >>>>>>>>>>>>>>> be undefined. In the above two examples, when writing > the > > > >>>>>>>>> processing > > > >>>>>>>>>>>>> logic, > > > >>>>>>>>>>>>>>> Users can use .cache() to hint Flink that those results > > > >>>>>> should > > > >>>>>>>> be > > > >>>>>>>>>>> saved > > > >>>>>>>>>>>>> to > > > >>>>>>>>>>>>>>> avoid repeated computation. And then for the result of > my > > > >>>>>>>>>> application > > > >>>>>>>>>>>>>>> logic, I'll call materialize(), so that these results > > > >>>>> could > > > >>>>>> be > > > >>>>>>>>>> managed > > > >>>>>>>>>>>>> by > > > >>>>>>>>>>>>>>> the system with versioning, metadata management, > > lifecycle > > > >>>>>>>>>> management, > > > >>>>>>>>>>>>>>> ACLs, etc. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> It is true we can use materialize() to do the cache() > > job, > > > >>>>>>> but I > > > >>>>>>>>> am > > > >>>>>>>>>>>>> really > > > >>>>>>>>>>>>>>> reluctant to shoehorn cache() into materialize() and > > force > > > >>>>>>> users > > > >>>>>>>>> to > > > >>>>>>>>>>>>> worry > > > >>>>>>>>>>>>>>> about a bunch of implications that they needn't have > to. > > I > > > >>>>>> am > > > >>>>>>>>>>>>> absolutely on > > > >>>>>>>>>>>>>>> your side that redundant API is bad. But it is equally > > > >>>>>>>>> frustrating, > > > >>>>>>>>>> if > > > >>>>>>>>>>>>> not > > > >>>>>>>>>>>>>>> more, that the same API does different things. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Thanks, > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Jiangjie (Becket) Qin > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> On Fri, Nov 30, 2018 at 10:34 PM Shaoxuan Wang < > > > >>>>>>>>> wshaox...@gmail.com > > > >>>>>>>>>>> > > > >>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Thanks Piotrek, > > > >>>>>>>>>>>>>>>> You provided a very good example, it explains all the > > > >>>>>>>> confusions > > > >>>>>>>>> I > > > >>>>>>>>>>>>> have. > > > >>>>>>>>>>>>>>>> It is clear that there is something we have not > > > >>>>> considered > > > >>>>>> in > > > >>>>>>>> the > > > >>>>>>>>>>>>> initial > > > >>>>>>>>>>>>>>>> proposal. We intend to force the user to reuse the > > > >>>>>>>>>>> cached/materialized > > > >>>>>>>>>>>>>>>> table, if its cache() method is executed. We did not > > > >>>>> expect > > > >>>>>>>> that > > > >>>>>>>>>> user > > > >>>>>>>>>>>>> may > > > >>>>>>>>>>>>>>>> want to re-executed the plan from the source table. > Let > > > >>>>> me > > > >>>>>>>>> re-think > > > >>>>>>>>>>>>> about > > > >>>>>>>>>>>>>>>> it and get back to you later. > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> In the meanwhile, this example/observation also infers > > > >>>>> that > > > >>>>>>> we > > > >>>>>>>>>> cannot > > > >>>>>>>>>>>>> fully > > > >>>>>>>>>>>>>>>> involve the optimizer to decide the plan if a > > > >>>>>>> cache/materialize > > > >>>>>>>>> is > > > >>>>>>>>>>>>>>>> explicitly used, because weather to reuse the cache > data > > > >>>>> or > > > >>>>>>>>>>> re-execute > > > >>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>> query from source data may lead to different results. > > > >>>>> (But > > > >>>>>> I > > > >>>>>>>>> guess > > > >>>>>>>>>>>>>>>> optimizer can still help in some cases ---- as long as > > it > > > >>>>>>> does > > > >>>>>>>>> not > > > >>>>>>>>>>>>>>>> re-execute from the varied source, we should be safe). > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Regards, > > > >>>>>>>>>>>>>>>> Shaoxuan > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> On Fri, Nov 30, 2018 at 9:13 PM Piotr Nowojski < > > > >>>>>>>>>>>>> pi...@data-artisans.com> > > > >>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Hi Shaoxuan, > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Re 2: > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Table t3 = methodThatAppliesOperators(t1) // t1 is > > > >>>>>> modified > > > >>>>>>>>> to-> > > > >>>>>>>>>>> t1’ > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> What do you mean that “ t1 is modified to-> t1’ ” ? > > That > > > >>>>>>>>>>>>>>>>> `methodThatAppliesOperators()` method has changed > it’s > > > >>>>>> plan? > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> I was thinking more about something like this: > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Table source = … // some source that scans files > from a > > > >>>>>>>>> directory > > > >>>>>>>>>>>>>>>>> “/foo/bar/“ > > > >>>>>>>>>>>>>>>>> Table t1 = source.groupBy(…).select(…).where(…) ….; > > > >>>>>>>>>>>>>>>>> Table t2 = t1.materialize() // (or `cache()`) > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> t2.count() // initialise cache (if it’s lazily > > > >>>>>> initialised) > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> int a1 = t1.count() > > > >>>>>>>>>>>>>>>>> int b1 = t2.count() > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> // something in the background (or we trigger it) > > writes > > > >>>>>> new > > > >>>>>>>>> files > > > >>>>>>>>>>> to > > > >>>>>>>>>>>>>>>>> /foo/bar > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> int a2 = t1.count() > > > >>>>>>>>>>>>>>>>> int b2 = t2.count() > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> t2.refresh() // possible future extension, not to be > > > >>>>>>>> implemented > > > >>>>>>>>>> in > > > >>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>> initial version > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> int a3 = t1.count() > > > >>>>>>>>>>>>>>>>> int b3 = t2.count() > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> t2.drop() // another possible future extension, > manual > > > >>>>>>> “cache” > > > >>>>>>>>>>>>> dropping > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> assertTrue(a1 == b1) // same results, but b1 comes > from > > > >>>>>> the > > > >>>>>>>>>> “cache" > > > >>>>>>>>>>>>>>>>> assertTrue(b1 == b2) // both values come from the > same > > > >>>>>> cache > > > >>>>>>>>>>>>>>>>> assertTrue(a2 > b2) // b2 comes from cache, a2 > > > >>>>> re-executed > > > >>>>>>>> full > > > >>>>>>>>>>> table > > > >>>>>>>>>>>>>>>> scan > > > >>>>>>>>>>>>>>>>> and has more data > > > >>>>>>>>>>>>>>>>> assertTrue(b3 > b2) // b3 comes from refreshed cache > > > >>>>>>>>>>>>>>>>> assertTrue(b3 == a2 == a3) > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Piotrek > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> On 30 Nov 2018, at 10:22, Jark Wu <imj...@gmail.com > > > > > >>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Hi, > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> It is an very interesting and useful design! > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Here I want to share some of my thoughts: > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> 1. Agree with that cache() method should return some > > > >>>>>> Table > > > >>>>>>> to > > > >>>>>>>>>> avoid > > > >>>>>>>>>>>>>>>> some > > > >>>>>>>>>>>>>>>>>> unexpected problems because of the mutable object. > > > >>>>>>>>>>>>>>>>>> All the existing methods of Table are returning a > new > > > >>>>>> Table > > > >>>>>>>>>>> instance. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> 2. I think materialize() would be more consistent > with > > > >>>>>> SQL, > > > >>>>>>>>> this > > > >>>>>>>>>>>>> makes > > > >>>>>>>>>>>>>>>> it > > > >>>>>>>>>>>>>>>>>> possible to support the same feature for SQL > > > >>>>> (materialize > > > >>>>>>>> view) > > > >>>>>>>>>> and > > > >>>>>>>>>>>>>>>> keep > > > >>>>>>>>>>>>>>>>>> the same API for users in the future. > > > >>>>>>>>>>>>>>>>>> But I'm also fine if we choose cache(). > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> 3. In the proposal, a TableService (or > FlinkService?) > > > >>>>> is > > > >>>>>>> used > > > >>>>>>>>> to > > > >>>>>>>>>>>>> cache > > > >>>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>> result of the (intermediate) table. > > > >>>>>>>>>>>>>>>>>> But the name of TableService may be a bit general > > which > > > >>>>>> is > > > >>>>>>>> not > > > >>>>>>>>>>> quite > > > >>>>>>>>>>>>>>>>>> understanding correctly in the first glance (a > > > >>>>> metastore > > > >>>>>>> for > > > >>>>>>>>>>>>> tables?). > > > >>>>>>>>>>>>>>>>>> Maybe a more specific name would be better, such as > > > >>>>>>>>>>> TableCacheSerive > > > >>>>>>>>>>>>>>>> or > > > >>>>>>>>>>>>>>>>>> TableMaterializeSerivce or something else. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>>>>>> Jark > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> On Thu, 29 Nov 2018 at 21:16, Fabian Hueske < > > > >>>>>>>> fhue...@gmail.com > > > >>>>>>>>>> > > > >>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Hi, > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Thanks for the clarification Becket! > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> I have a few thoughts to share / questions: > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> 1) I'd like to know how you plan to implement the > > > >>>>>> feature > > > >>>>>>>> on a > > > >>>>>>>>>>> plan > > > >>>>>>>>>>>>> / > > > >>>>>>>>>>>>>>>>>>> planner level. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> I would imaging the following to happen when > > > >>>>>> Table.cache() > > > >>>>>>>> is > > > >>>>>>>>>>>>> called: > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> 1) immediately optimize the Table and internally > > > >>>>> convert > > > >>>>>>> it > > > >>>>>>>>>> into a > > > >>>>>>>>>>>>>>>>>>> DataSet/DataStream. This is necessary, to avoid > that > > > >>>>>>>> operators > > > >>>>>>>>>> of > > > >>>>>>>>>>>>>>>> later > > > >>>>>>>>>>>>>>>>>>> queries on top of the Table are pushed down. > > > >>>>>>>>>>>>>>>>>>> 2) register the DataSet/DataStream as a > > > >>>>>>>>>> DataSet/DataStream-backed > > > >>>>>>>>>>>>>>>> Table > > > >>>>>>>>>>>>>>>>> X > > > >>>>>>>>>>>>>>>>>>> 3) add a sink to the DataSet/DataStream. This is > the > > > >>>>>>>>>>> materialization > > > >>>>>>>>>>>>>>>> of > > > >>>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>> Table X > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Based on your proposal the following would happen: > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Table t1 = .... > > > >>>>>>>>>>>>>>>>>>> t1.cache(); // cache() returns void. The logical > plan > > > >>>>> of > > > >>>>>>> t1 > > > >>>>>>>> is > > > >>>>>>>>>>>>>>>> replaced > > > >>>>>>>>>>>>>>>>> by > > > >>>>>>>>>>>>>>>>>>> a scan of X. There is also a reference to the > > > >>>>>>>> materialization > > > >>>>>>>>> of > > > >>>>>>>>>>> X. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> t1.count(); // this executes the program, including > > > >>>>> the > > > >>>>>>>>>>>>>>>>> DataSet/DataStream > > > >>>>>>>>>>>>>>>>>>> that backs X and the sink that writes the > > > >>>>>> materialization > > > >>>>>>>> of X > > > >>>>>>>>>>>>>>>>>>> t1.count(); // this executes the program, but > reads X > > > >>>>>> from > > > >>>>>>>> the > > > >>>>>>>>>>>>>>>>>>> materialization. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> My question is, how do you determine when whether > the > > > >>>>>> scan > > > >>>>>>>> of > > > >>>>>>>>> t1 > > > >>>>>>>>>>>>>>>> should > > > >>>>>>>>>>>>>>>>> go > > > >>>>>>>>>>>>>>>>>>> against the DataSet/DataStream program and when > > > >>>>> against > > > >>>>>>> the > > > >>>>>>>>>>>>>>>>>>> materialization? > > > >>>>>>>>>>>>>>>>>>> AFAIK, there is no hook that will tell you that a > > part > > > >>>>>> of > > > >>>>>>>> the > > > >>>>>>>>>>>>> program > > > >>>>>>>>>>>>>>>>> was > > > >>>>>>>>>>>>>>>>>>> executed. Flipping a switch during optimization or > > > >>>>> plan > > > >>>>>>>>>> generation > > > >>>>>>>>>>>>> is > > > >>>>>>>>>>>>>>>>> not > > > >>>>>>>>>>>>>>>>>>> sufficient as there is no guarantee that the plan > is > > > >>>>>> also > > > >>>>>>>>>>> executed. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Overall, this behavior is somewhat similar to what > I > > > >>>>>>>> proposed > > > >>>>>>>>> in > > > >>>>>>>>>>>>>>>>>>> FLINK-8950, which does not include persisting the > > > >>>>> table, > > > >>>>>>> but > > > >>>>>>>>>> just > > > >>>>>>>>>>>>>>>>>>> optimizing and reregistering it as > DataSet/DataStream > > > >>>>>>> scan. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> 2) I think Piotr has a point about the implicit > > > >>>>> behavior > > > >>>>>>> and > > > >>>>>>>>>> side > > > >>>>>>>>>>>>>>>>> effects > > > >>>>>>>>>>>>>>>>>>> of the cache() method if it does not return > anything. > > > >>>>>>>>>>>>>>>>>>> Consider the following example: > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Table t1 = ??? > > > >>>>>>>>>>>>>>>>>>> Table t2 = methodThatAppliesOperators(t1); > > > >>>>>>>>>>>>>>>>>>> Table t3 = methodThatAppliesOtherOperators(t1); > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> In this case, the behavior/performance of the plan > > > >>>>> that > > > >>>>>>>>> results > > > >>>>>>>>>>> from > > > >>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>> second method call depends on whether t1 was > modified > > > >>>>> by > > > >>>>>>> the > > > >>>>>>>>>> first > > > >>>>>>>>>>>>>>>>> method > > > >>>>>>>>>>>>>>>>>>> or not. > > > >>>>>>>>>>>>>>>>>>> This is the classic issue of mutable vs. immutable > > > >>>>>>> objects. > > > >>>>>>>>>>>>>>>>>>> Also, as Piotr pointed out, it might also be good > to > > > >>>>>> have > > > >>>>>>>> the > > > >>>>>>>>>>>>> original > > > >>>>>>>>>>>>>>>>> plan > > > >>>>>>>>>>>>>>>>>>> of t1, because in some cases it is possible to push > > > >>>>>>> filters > > > >>>>>>>>> down > > > >>>>>>>>>>>>> such > > > >>>>>>>>>>>>>>>>> that > > > >>>>>>>>>>>>>>>>>>> evaluating the query from scratch might be more > > > >>>>>> efficient > > > >>>>>>>> than > > > >>>>>>>>>>>>>>>> accessing > > > >>>>>>>>>>>>>>>>>>> the cache. > > > >>>>>>>>>>>>>>>>>>> Moreover, a CachedTable could extend Table() and > > > >>>>> offer a > > > >>>>>>>>> method > > > >>>>>>>>>>>>>>>>> refresh(). > > > >>>>>>>>>>>>>>>>>>> This sounds quite useful in an interactive session > > > >>>>> mode. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> 3) Regarding the name, I can see both arguments. > IMO, > > > >>>>>>>>>>> materialize() > > > >>>>>>>>>>>>>>>>> seems > > > >>>>>>>>>>>>>>>>>>> to be more future proof. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Best, Fabian > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Am Do., 29. Nov. 2018 um 12:56 Uhr schrieb Shaoxuan > > > >>>>>> Wang < > > > >>>>>>>>>>>>>>>>>>> wshaox...@gmail.com>: > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> Hi Piotr, > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> Thanks for sharing your ideas on the method > naming. > > > >>>>> We > > > >>>>>>> will > > > >>>>>>>>>> think > > > >>>>>>>>>>>>>>>> about > > > >>>>>>>>>>>>>>>>>>>> your suggestions. But I don't understand why we > need > > > >>>>> to > > > >>>>>>>>> change > > > >>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>> return > > > >>>>>>>>>>>>>>>>>>>> type of cache(). > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> Cache() is a physical operation, it does not > change > > > >>>>> the > > > >>>>>>>> logic > > > >>>>>>>>>> of > > > >>>>>>>>>>>>>>>>>>>> the `Table`. On the tableAPI layer, we should not > > > >>>>>>>> introduce a > > > >>>>>>>>>> new > > > >>>>>>>>>>>>>>>> table > > > >>>>>>>>>>>>>>>>>>>> type unless the logic of table has been changed. > If > > > >>>>> we > > > >>>>>>>>>> introduce > > > >>>>>>>>>>> a > > > >>>>>>>>>>>>>>>> new > > > >>>>>>>>>>>>>>>>>>>> table type `CachedTable`, we need create the same > > set > > > >>>>>> of > > > >>>>>>>>>> methods > > > >>>>>>>>>>> of > > > >>>>>>>>>>>>>>>>>>> `Table` > > > >>>>>>>>>>>>>>>>>>>> for it. I don't think it is worth doing this. Or > can > > > >>>>>> you > > > >>>>>>>>> please > > > >>>>>>>>>>>>>>>>> elaborate > > > >>>>>>>>>>>>>>>>>>>> more on what could be the "implicit > behaviours/side > > > >>>>>>>> effects" > > > >>>>>>>>>> you > > > >>>>>>>>>>>>> are > > > >>>>>>>>>>>>>>>>>>>> thinking about? > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> Regards, > > > >>>>>>>>>>>>>>>>>>>> Shaoxuan > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> On Thu, Nov 29, 2018 at 7:05 PM Piotr Nowojski < > > > >>>>>>>>>>>>>>>>> pi...@data-artisans.com> > > > >>>>>>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> Hi Becket, > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> Thanks for the response. > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> 1. I wasn’t saying that materialised view must be > > > >>>>>>> mutable > > > >>>>>>>> or > > > >>>>>>>>>>> not. > > > >>>>>>>>>>>>>>>> The > > > >>>>>>>>>>>>>>>>>>>> same > > > >>>>>>>>>>>>>>>>>>>>> thing applies to caches as well. To the > contrary, I > > > >>>>>>> would > > > >>>>>>>>>> expect > > > >>>>>>>>>>>>>>>> more > > > >>>>>>>>>>>>>>>>>>>>> consistency and updates from something that is > > > >>>>> called > > > >>>>>>>>> “cache” > > > >>>>>>>>>> vs > > > >>>>>>>>>>>>>>>>>>>> something > > > >>>>>>>>>>>>>>>>>>>>> that’s a “materialised view”. In other words, IMO > > > >>>>> most > > > >>>>>>>>> caches > > > >>>>>>>>>> do > > > >>>>>>>>>>>>> not > > > >>>>>>>>>>>>>>>>>>>> serve > > > >>>>>>>>>>>>>>>>>>>>> you invalid/outdated data and they handle updates > > on > > > >>>>>>> their > > > >>>>>>>>>> own. > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> 2. I don’t think that having in the future two > very > > > >>>>>>>> similar > > > >>>>>>>>>>>>> concepts > > > >>>>>>>>>>>>>>>>> of > > > >>>>>>>>>>>>>>>>>>>>> `materialized` view and `cache` is a good idea. > It > > > >>>>>> would > > > >>>>>>>> be > > > >>>>>>>>>>>>>>>> confusing > > > >>>>>>>>>>>>>>>>>>> for > > > >>>>>>>>>>>>>>>>>>>>> the users. I think it could be handled by > > > >>>>>>>>>> variations/overloading > > > >>>>>>>>>>>>> of > > > >>>>>>>>>>>>>>>>>>>>> materialised view concept. We could start with: > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> `MaterializedTable materialize()` - immutable, > > > >>>>> session > > > >>>>>>>> life > > > >>>>>>>>>>> scope > > > >>>>>>>>>>>>>>>>>>>>> (basically the same semantic as you are proposing > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> And then in the future (if ever) build on top of > > > >>>>>>>> that/expand > > > >>>>>>>>>> it > > > >>>>>>>>>>>>>>>> with: > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> `MaterializedTable materialize(refreshTime=…)` or > > > >>>>>>>>>>>>> `MaterializedTable > > > >>>>>>>>>>>>>>>>>>>>> materialize(refreshHook=…)` > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> Or with cross session support: > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> `MaterializedTable materializeInto(connector=…)` > or > > > >>>>>>>>>>>>>>>> `MaterializedTable > > > >>>>>>>>>>>>>>>>>>>>> materializeInto(tableFactory=…)` > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> I’m not saying that we should implement cross > > > >>>>>>>>>> session/refreshing > > > >>>>>>>>>>>>> now > > > >>>>>>>>>>>>>>>>> or > > > >>>>>>>>>>>>>>>>>>>>> even in the near future. I’m just arguing that > > > >>>>> naming > > > >>>>>>>>> current > > > >>>>>>>>>>>>>>>>> immutable > > > >>>>>>>>>>>>>>>>>>>>> session life scope method `materialize()` is more > > > >>>>>> future > > > >>>>>>>>> proof > > > >>>>>>>>>>> and > > > >>>>>>>>>>>>>>>>> more > > > >>>>>>>>>>>>>>>>>>>>> consistent with SQL (on which after all table-api > > is > > > >>>>>>>> heavily > > > >>>>>>>>>>>>> basing > > > >>>>>>>>>>>>>>>>>>> on). > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> 3. Even if we agree on naming it `cache()`, I > would > > > >>>>>>> still > > > >>>>>>>>>> insist > > > >>>>>>>>>>>>> on > > > >>>>>>>>>>>>>>>>>>>>> `cache()` returning `CachedTable` handle to avoid > > > >>>>>>> implicit > > > >>>>>>>>>>>>>>>>>>>> behaviours/side > > > >>>>>>>>>>>>>>>>>>>>> effects and to give both us & users more > > > >>>>> flexibility. > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> Piotrek > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>> On 29 Nov 2018, at 06:20, Becket Qin < > > > >>>>>>>> becket....@gmail.com > > > >>>>>>>>>> > > > >>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>> Just to add a little bit, the materialized view > is > > > >>>>>>>> probably > > > >>>>>>>>>>> more > > > >>>>>>>>>>>>>>>>>>>> similar > > > >>>>>>>>>>>>>>>>>>>>> to > > > >>>>>>>>>>>>>>>>>>>>>> the persistent() brought up earlier in the > thread. > > > >>>>> So > > > >>>>>>> it > > > >>>>>>>> is > > > >>>>>>>>>>>>> usually > > > >>>>>>>>>>>>>>>>>>>> cross > > > >>>>>>>>>>>>>>>>>>>>>> session and could be used in a larger scope. For > > > >>>>>>>> example, a > > > >>>>>>>>>>>>>>>>>>>> materialized > > > >>>>>>>>>>>>>>>>>>>>>> view created by user A may be visible to user B. > > It > > > >>>>>> is > > > >>>>>>>>>> probably > > > >>>>>>>>>>>>>>>>>>>> something > > > >>>>>>>>>>>>>>>>>>>>>> we want to have in the future. I'll put it in > the > > > >>>>>>> future > > > >>>>>>>>> work > > > >>>>>>>>>>>>>>>>>>> section. > > > >>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>> Thanks, > > > >>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > > > >>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>> On Thu, Nov 29, 2018 at 9:47 AM Becket Qin < > > > >>>>>>>>>>> becket....@gmail.com > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> Hi Piotrek, > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> Thanks for the explanation. > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> Right now we are mostly thinking of the cached > > > >>>>> table > > > >>>>>>> as > > > >>>>>>>>>>>>>>>> immutable. I > > > >>>>>>>>>>>>>>>>>>>> can > > > >>>>>>>>>>>>>>>>>>>>>>> see the Materialized view would be useful in > the > > > >>>>>>> future. > > > >>>>>>>>>> That > > > >>>>>>>>>>>>>>>> said, > > > >>>>>>>>>>>>>>>>>>> I > > > >>>>>>>>>>>>>>>>>>>>> think > > > >>>>>>>>>>>>>>>>>>>>>>> a simple cache mechanism is probably still > > needed. > > > >>>>>> So > > > >>>>>>> to > > > >>>>>>>>> me, > > > >>>>>>>>>>>>>>>> cache() > > > >>>>>>>>>>>>>>>>>>>> and > > > >>>>>>>>>>>>>>>>>>>>>>> materialize() should be two separate method as > > > >>>>> they > > > >>>>>>>>> address > > > >>>>>>>>>>>>>>>>>>> different > > > >>>>>>>>>>>>>>>>>>>>>>> needs. Materialize() is a higher level concept > > > >>>>>> usually > > > >>>>>>>>>>> implying > > > >>>>>>>>>>>>>>>>>>>>> periodical > > > >>>>>>>>>>>>>>>>>>>>>>> update, while cache() has much simpler > semantic. > > > >>>>> For > > > >>>>>>>>>> example, > > > >>>>>>>>>>>>> one > > > >>>>>>>>>>>>>>>>>>> may > > > >>>>>>>>>>>>>>>>>>>>>>> create a materialized view and use cache() > method > > > >>>>> in > > > >>>>>>> the > > > >>>>>>>>>>>>>>>>>>> materialized > > > >>>>>>>>>>>>>>>>>>>>> view > > > >>>>>>>>>>>>>>>>>>>>>>> creation logic. So that during the materialized > > > >>>>> view > > > >>>>>>>>> update, > > > >>>>>>>>>>>>> they > > > >>>>>>>>>>>>>>>> do > > > >>>>>>>>>>>>>>>>>>>> not > > > >>>>>>>>>>>>>>>>>>>>>>> need to worry about the case that the cached > > table > > > >>>>>> is > > > >>>>>>>> also > > > >>>>>>>>>>>>>>>> changed. > > > >>>>>>>>>>>>>>>>>>>>> Maybe > > > >>>>>>>>>>>>>>>>>>>>>>> under the hood, materialized() and cache() > could > > > >>>>>> share > > > >>>>>>>>> some > > > >>>>>>>>>>>>>>>>>>> mechanism, > > > >>>>>>>>>>>>>>>>>>>>> but > > > >>>>>>>>>>>>>>>>>>>>>>> I think a simple cache() method would be handy > in > > > >>>>> a > > > >>>>>>> lot > > > >>>>>>>> of > > > >>>>>>>>>>>>> cases. > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> Thanks, > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> On Mon, Nov 26, 2018 at 9:38 PM Piotr Nowojski > < > > > >>>>>>>>>>>>>>>>>>>> pi...@data-artisans.com > > > >>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> Hi Becket, > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>> Is there any extra thing user can do on a > > > >>>>>>>>>> MaterializedTable > > > >>>>>>>>>>>>> that > > > >>>>>>>>>>>>>>>>>>>> they > > > >>>>>>>>>>>>>>>>>>>>>>>> cannot do on a Table? > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> Maybe not in the initial implementation, but > > > >>>>>> various > > > >>>>>>>> DBs > > > >>>>>>>>>>> offer > > > >>>>>>>>>>>>>>>>>>>>> different > > > >>>>>>>>>>>>>>>>>>>>>>>> ways to “refresh” the materialised view. > Hooks, > > > >>>>>>>> triggers, > > > >>>>>>>>>>>>> timers, > > > >>>>>>>>>>>>>>>>>>>>> manually > > > >>>>>>>>>>>>>>>>>>>>>>>> etc. Having `MaterializedTable` would help us > to > > > >>>>>>> handle > > > >>>>>>>>>> that > > > >>>>>>>>>>> in > > > >>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>>>> future. > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>> After users call *table.cache(), *users can > > just > > > >>>>>> use > > > >>>>>>>>> that > > > >>>>>>>>>>>>> table > > > >>>>>>>>>>>>>>>>>>> and > > > >>>>>>>>>>>>>>>>>>>> do > > > >>>>>>>>>>>>>>>>>>>>>>>> anything that is supported on a Table, > including > > > >>>>>> SQL. > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> This is some implicit behaviour with side > > > >>>>> effects. > > > >>>>>>>>> Imagine > > > >>>>>>>>>> if > > > >>>>>>>>>>>>>>>> user > > > >>>>>>>>>>>>>>>>>>>> has > > > >>>>>>>>>>>>>>>>>>>>> a > > > >>>>>>>>>>>>>>>>>>>>>>>> long and complicated program, that touches > table > > > >>>>>> `b` > > > >>>>>>>>>> multiple > > > >>>>>>>>>>>>>>>>>>> times, > > > >>>>>>>>>>>>>>>>>>>>> maybe > > > >>>>>>>>>>>>>>>>>>>>>>>> scattered around different methods. If he > > > >>>>> modifies > > > >>>>>>> his > > > >>>>>>>>>>> program > > > >>>>>>>>>>>>> by > > > >>>>>>>>>>>>>>>>>>>>> inserting > > > >>>>>>>>>>>>>>>>>>>>>>>> in one place > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> b.cache() > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> This implicitly alters the semantic and > > behaviour > > > >>>>>> of > > > >>>>>>>> his > > > >>>>>>>>>> code > > > >>>>>>>>>>>>> all > > > >>>>>>>>>>>>>>>>>>>> over > > > >>>>>>>>>>>>>>>>>>>>>>>> the place, maybe in a ways that might cause > > > >>>>>> problems. > > > >>>>>>>> For > > > >>>>>>>>>>>>> example > > > >>>>>>>>>>>>>>>>>>>> what > > > >>>>>>>>>>>>>>>>>>>>> if > > > >>>>>>>>>>>>>>>>>>>>>>>> underlying data is changing? > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> Having invisible side effects is also not very > > > >>>>>> clean, > > > >>>>>>>> for > > > >>>>>>>>>>>>> example > > > >>>>>>>>>>>>>>>>>>>> think > > > >>>>>>>>>>>>>>>>>>>>>>>> about something like this (but more > > complicated): > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> Table b = ...; > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> If (some_condition) { > > > >>>>>>>>>>>>>>>>>>>>>>>> processTable1(b) > > > >>>>>>>>>>>>>>>>>>>>>>>> } > > > >>>>>>>>>>>>>>>>>>>>>>>> else { > > > >>>>>>>>>>>>>>>>>>>>>>>> processTable2(b) > > > >>>>>>>>>>>>>>>>>>>>>>>> } > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> // do more stuff with b > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> And user adds `b.cache()` call to only one of > > the > > > >>>>>>>>>>>>> `processTable1` > > > >>>>>>>>>>>>>>>>>>> or > > > >>>>>>>>>>>>>>>>>>>>>>>> `processTable2` methods. > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> On the other hand > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> Table materialisedB = b.materialize() > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> Avoids (at least some of) the side effect > issues > > > >>>>>> and > > > >>>>>>>>> forces > > > >>>>>>>>>>>>> user > > > >>>>>>>>>>>>>>>> to > > > >>>>>>>>>>>>>>>>>>>>>>>> explicitly use `materialisedB` where it’s > > > >>>>>> appropriate > > > >>>>>>>> and > > > >>>>>>>>>>>>> forces > > > >>>>>>>>>>>>>>>>>>> user > > > >>>>>>>>>>>>>>>>>>>>> to > > > >>>>>>>>>>>>>>>>>>>>>>>> think what does it actually mean. And if > > > >>>>> something > > > >>>>>>>>> doesn’t > > > >>>>>>>>>>> work > > > >>>>>>>>>>>>>>>> in > > > >>>>>>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>>>> end > > > >>>>>>>>>>>>>>>>>>>>>>>> for the user, he will know what has he changed > > > >>>>>>> instead > > > >>>>>>>> of > > > >>>>>>>>>>>>> blaming > > > >>>>>>>>>>>>>>>>>>>>> Flink for > > > >>>>>>>>>>>>>>>>>>>>>>>> some “magic” underneath. In the above example, > > > >>>>>> after > > > >>>>>>>>>>>>>>>> materialising > > > >>>>>>>>>>>>>>>>>>> b > > > >>>>>>>>>>>>>>>>>>>> in > > > >>>>>>>>>>>>>>>>>>>>>>>> only one of the methods, he should/would > realise > > > >>>>>>> about > > > >>>>>>>>> the > > > >>>>>>>>>>>>> issue > > > >>>>>>>>>>>>>>>>>>> when > > > >>>>>>>>>>>>>>>>>>>>>>>> handling the return value `MaterializedTable` > of > > > >>>>>> that > > > >>>>>>>>>> method. > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> I guess it comes down to personal preferences > if > > > >>>>>> you > > > >>>>>>>> like > > > >>>>>>>>>>>>> things > > > >>>>>>>>>>>>>>>> to > > > >>>>>>>>>>>>>>>>>>>> be > > > >>>>>>>>>>>>>>>>>>>>>>>> implicit or not. The more power is the user, > > > >>>>>> probably > > > >>>>>>>> the > > > >>>>>>>>>>> more > > > >>>>>>>>>>>>>>>>>>> likely > > > >>>>>>>>>>>>>>>>>>>>> he is > > > >>>>>>>>>>>>>>>>>>>>>>>> to like/understand implicit behaviour. And we > as > > > >>>>>>> Table > > > >>>>>>>>> API > > > >>>>>>>>>>>>>>>>>>> designers > > > >>>>>>>>>>>>>>>>>>>>> are > > > >>>>>>>>>>>>>>>>>>>>>>>> the most power users out there, so I would > > > >>>>> proceed > > > >>>>>>> with > > > >>>>>>>>>>> caution > > > >>>>>>>>>>>>>>>> (so > > > >>>>>>>>>>>>>>>>>>>>> that we > > > >>>>>>>>>>>>>>>>>>>>>>>> do not end up in the crazy perl realm with > it’s > > > >>>>>>> lovely > > > >>>>>>>>>>> implicit > > > >>>>>>>>>>>>>>>>>>>> method > > > >>>>>>>>>>>>>>>>>>>>>>>> arguments ;) < > > > >>>>>>>>>> https://stackoverflow.com/a/14922656/8149051 > > > >>>>>>>>>>>> ) > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>> Table API to also support non-relational > > > >>>>>> processing > > > >>>>>>>>> cases, > > > >>>>>>>>>>>>>>>> cache() > > > >>>>>>>>>>>>>>>>>>>>>>>> might be slightly better. > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> I think even such extended Table API could > > > >>>>> benefit > > > >>>>>>> from > > > >>>>>>>>>>>>> sticking > > > >>>>>>>>>>>>>>>>>>>>> to/being > > > >>>>>>>>>>>>>>>>>>>>>>>> consistent with SQL where both SQL and Table > API > > > >>>>>> are > > > >>>>>>>>>>> basically > > > >>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>>>> same. > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> One more thing. `MaterializedTable > > materialize()` > > > >>>>>>> could > > > >>>>>>>>> be > > > >>>>>>>>>>> more > > > >>>>>>>>>>>>>>>>>>>>>>>> powerful/flexible allowing the user to operate > > > >>>>> both > > > >>>>>>> on > > > >>>>>>>>>>>>>>>> materialised > > > >>>>>>>>>>>>>>>>>>>>> and not > > > >>>>>>>>>>>>>>>>>>>>>>>> materialised view at the same time for > whatever > > > >>>>>>> reasons > > > >>>>>>>>>>>>>>>> (underlying > > > >>>>>>>>>>>>>>>>>>>>> data > > > >>>>>>>>>>>>>>>>>>>>>>>> changing/better optimisation opportunities > after > > > >>>>>>>> pushing > > > >>>>>>>>>> down > > > >>>>>>>>>>>>>>>> more > > > >>>>>>>>>>>>>>>>>>>>> filters > > > >>>>>>>>>>>>>>>>>>>>>>>> etc). For example: > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> Table b = …; > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> MaterlizedTable mb = b.materialize(); > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> Val min = mb.min(); > > > >>>>>>>>>>>>>>>>>>>>>>>> Val max = mb.max(); > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> Val user42 = b.filter(‘userId = 42); > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> Could be more efficient compared to > `b.cache()` > > > >>>>> if > > > >>>>>>>>>>>>>>>> `filter(‘userId > > > >>>>>>>>>>>>>>>>>>> = > > > >>>>>>>>>>>>>>>>>>>>>>>> 42);` allows for much more aggressive > > > >>>>>> optimisations. > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> Piotrek > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>> On 26 Nov 2018, at 12:14, Fabian Hueske < > > > >>>>>>>>>> fhue...@gmail.com> > > > >>>>>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>> I'm not suggesting to add support for Ignite. > > > >>>>> This > > > >>>>>>> was > > > >>>>>>>>>> just > > > >>>>>>>>>>> an > > > >>>>>>>>>>>>>>>>>>>>> example. > > > >>>>>>>>>>>>>>>>>>>>>>>>> Plasma and Arrow sound interesting, too. > > > >>>>>>>>>>>>>>>>>>>>>>>>> For the sake of this proposal, it would be up > > to > > > >>>>>> the > > > >>>>>>>>> user > > > >>>>>>>>>> to > > > >>>>>>>>>>>>>>>>>>>>> implement a > > > >>>>>>>>>>>>>>>>>>>>>>>>> TableFactory and corresponding TableSource / > > > >>>>>>> TableSink > > > >>>>>>>>>>> classes > > > >>>>>>>>>>>>>>>> to > > > >>>>>>>>>>>>>>>>>>>>>>>> persist > > > >>>>>>>>>>>>>>>>>>>>>>>>> and read the data. > > > >>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>> Am Mo., 26. Nov. 2018 um 12:06 Uhr schrieb > > > >>>>> Flavio > > > >>>>>>>>>>> Pompermaier > > > >>>>>>>>>>>>> < > > > >>>>>>>>>>>>>>>>>>>>>>>>> pomperma...@okkam.it>: > > > >>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>> What about to add also Apache Plasma + Arrow > > as > > > >>>>>> an > > > >>>>>>>>>>>>> alternative > > > >>>>>>>>>>>>>>>> to > > > >>>>>>>>>>>>>>>>>>>>>>>> Apache > > > >>>>>>>>>>>>>>>>>>>>>>>>>> Ignite? > > > >>>>>>>>>>>>>>>>>>>>>>>>>> [1] > > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>> > > > >>>>>> > > > >>> > > > > https://arrow.apache.org/blog/2017/08/08/plasma-in-memory-object-store/ > > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Nov 26, 2018 at 11:56 AM Fabian > Hueske > > > >>>>> < > > > >>>>>>>>>>>>>>>>>>> fhue...@gmail.com> > > > >>>>>>>>>>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the proposal! > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> To summarize, you propose a new method > > > >>>>>>>> Table.cache(): > > > >>>>>>>>>>> Table > > > >>>>>>>>>>>>>>>> that > > > >>>>>>>>>>>>>>>>>>>>> will > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> trigger a job and write the result into > some > > > >>>>>>>> temporary > > > >>>>>>>>>>>>> storage > > > >>>>>>>>>>>>>>>>>>> as > > > >>>>>>>>>>>>>>>>>>>>>>>> defined > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> by a TableFactory. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> The cache() call blocks while the job is > > > >>>>> running > > > >>>>>>> and > > > >>>>>>>>>>>>>>>> eventually > > > >>>>>>>>>>>>>>>>>>>>>>>> returns a > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Table object that represents a scan of the > > > >>>>>>> temporary > > > >>>>>>>>>>> table. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> When the "session" is closed (closing to be > > > >>>>>>>> defined?), > > > >>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>>> temporary > > > >>>>>>>>>>>>>>>>>>>>>>>>>> tables > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> are all dropped. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> I think this behavior makes sense and is a > > > >>>>> good > > > >>>>>>>> first > > > >>>>>>>>>> step > > > >>>>>>>>>>>>>>>>>>> towards > > > >>>>>>>>>>>>>>>>>>>>>>>> more > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> interactive workloads. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> However, its performance suffers from > writing > > > >>>>> to > > > >>>>>>> and > > > >>>>>>>>>>> reading > > > >>>>>>>>>>>>>>>>>>> from > > > >>>>>>>>>>>>>>>>>>>>>>>>>> external > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> systems. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> I think this is OK for now. Changes that > > would > > > >>>>>>>>>>> significantly > > > >>>>>>>>>>>>>>>>>>>> improve > > > >>>>>>>>>>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> situation (i.e., pinning data in-memory > > across > > > >>>>>>> jobs) > > > >>>>>>>>>> would > > > >>>>>>>>>>>>>>>> have > > > >>>>>>>>>>>>>>>>>>>>> large > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> impacts on many components of Flink. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Users could use in-memory filesystems or > > > >>>>> storage > > > >>>>>>>> grids > > > >>>>>>>>>>>>> (Apache > > > >>>>>>>>>>>>>>>>>>>>>>>> Ignite) to > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> mitigate some of the performance effects. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Best, Fabian > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Am Mo., 26. Nov. 2018 um 03:38 Uhr schrieb > > > >>>>>> Becket > > > >>>>>>>> Qin > > > >>>>>>>>> < > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> becket....@gmail.com > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> : > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the explanation, Piotrek. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Is there any extra thing user can do on a > > > >>>>>>>>>>> MaterializedTable > > > >>>>>>>>>>>>>>>>>>> that > > > >>>>>>>>>>>>>>>>>>>>> they > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> cannot do on a Table? After users call > > > >>>>>>>>> *table.cache(), > > > >>>>>>>>>>>>> *users > > > >>>>>>>>>>>>>>>>>>> can > > > >>>>>>>>>>>>>>>>>>>>>>>> just > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> use > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> that table and do anything that is > supported > > > >>>>>> on a > > > >>>>>>>>>> Table, > > > >>>>>>>>>>>>>>>>>>>> including > > > >>>>>>>>>>>>>>>>>>>>>>>> SQL. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Naming wise, either cache() or > materialize() > > > >>>>>>> sounds > > > >>>>>>>>>> fine > > > >>>>>>>>>>> to > > > >>>>>>>>>>>>>>>> me. > > > >>>>>>>>>>>>>>>>>>>>>>>> cache() > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> is > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> a bit more general than materialize(). > Given > > > >>>>>> that > > > >>>>>>>> we > > > >>>>>>>>>> are > > > >>>>>>>>>>>>>>>>>>>> enhancing > > > >>>>>>>>>>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Table API to also support non-relational > > > >>>>>>> processing > > > >>>>>>>>>>> cases, > > > >>>>>>>>>>>>>>>>>>>> cache() > > > >>>>>>>>>>>>>>>>>>>>>>>>>> might > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> be > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> slightly better. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 11:25 PM Piotr > > > >>>>>> Nowojski < > > > >>>>>>>>>>>>>>>>>>>>>>>>>> pi...@data-artisans.com > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Becket, > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Ops, sorry I didn’t notice that you > intend > > > >>>>> to > > > >>>>>>>> reuse > > > >>>>>>>>>>>>> existing > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> `TableFactory`. I don’t know why, but I > > > >>>>>> assumed > > > >>>>>>>> that > > > >>>>>>>>>> you > > > >>>>>>>>>>>>>>>> want > > > >>>>>>>>>>>>>>>>>>> to > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> provide > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> an > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> alternate way of writing the data. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Now that I hopefully understand the > > > >>>>> proposal, > > > >>>>>>>> maybe > > > >>>>>>>>> we > > > >>>>>>>>>>>>> could > > > >>>>>>>>>>>>>>>>>>>>> rename > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> `cache()` to > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> void materialize() > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> or going step further > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> MaterializedTable materialize() > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> MaterializedTable > createMaterializedView() > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> ? > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> The second option with returning a > handle I > > > >>>>>>> think > > > >>>>>>>> is > > > >>>>>>>>>>> more > > > >>>>>>>>>>>>>>>>>>>> flexible > > > >>>>>>>>>>>>>>>>>>>>>>>>>> and > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> could provide features such as > > > >>>>>>> “refresh”/“delete” > > > >>>>>>>> or > > > >>>>>>>>>>>>>>>> generally > > > >>>>>>>>>>>>>>>>>>>>>>>>>> speaking > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> manage the the view. In the future we > could > > > >>>>>> also > > > >>>>>>>>> think > > > >>>>>>>>>>>>> about > > > >>>>>>>>>>>>>>>>>>>>> adding > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> hooks > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> to automatically refresh view etc. It is > > > >>>>> also > > > >>>>>>> more > > > >>>>>>>>>>>>> explicit > > > >>>>>>>>>>>>>>>> - > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> materialization returning a new table > > handle > > > >>>>>>> will > > > >>>>>>>>> not > > > >>>>>>>>>>> have > > > >>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>>>> same > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> implicit side effects as adding a simple > > > >>>>> line > > > >>>>>> of > > > >>>>>>>>> code > > > >>>>>>>>>>> like > > > >>>>>>>>>>>>>>>>>>>>>>>>>> `b.cache()` > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> would have. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> It would also be more SQL like, making it > > > >>>>> more > > > >>>>>>>>>> intuitive > > > >>>>>>>>>>>>> for > > > >>>>>>>>>>>>>>>>>>>> users > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> already > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> familiar with the SQL. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Piotrek > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 23 Nov 2018, at 14:53, Becket Qin < > > > >>>>>>>>>>>>> becket....@gmail.com > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Piotrek, > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For the cache() method itself, yes, it > is > > > >>>>>>>>> equivalent > > > >>>>>>>>>> to > > > >>>>>>>>>>>>>>>>>>>> creating > > > >>>>>>>>>>>>>>>>>>>>> a > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> BUILT-IN > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> materialized view with a lifecycle. That > > > >>>>>>>>>> functionality > > > >>>>>>>>>>> is > > > >>>>>>>>>>>>>>>>>>>> missing > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> today, > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> though. Not sure if I understand your > > > >>>>>> question. > > > >>>>>>>> Do > > > >>>>>>>>>> you > > > >>>>>>>>>>>>> mean > > > >>>>>>>>>>>>>>>>>>> we > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> already > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> have > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the functionality and just need a syntax > > > >>>>>> sugar? > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> What's more interesting in the proposal > is > > > >>>>> do > > > >>>>>>> we > > > >>>>>>>>> want > > > >>>>>>>>>>> to > > > >>>>>>>>>>>>>>>> stop > > > >>>>>>>>>>>>>>>>>>>> at > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> creating > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the materialized view? Or do we want to > > > >>>>>> extend > > > >>>>>>>> that > > > >>>>>>>>>> in > > > >>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>>> future > > > >>>>>>>>>>>>>>>>>>>>>>>>>> to > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> a > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> more > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> useful unified data store distributed > with > > > >>>>>>> Flink? > > > >>>>>>>>> And > > > >>>>>>>>>>> do > > > >>>>>>>>>>>>> we > > > >>>>>>>>>>>>>>>>>>>> want > > > >>>>>>>>>>>>>>>>>>>>> to > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> have > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> a > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mechanism allow more flexible user job > > > >>>>>> pattern > > > >>>>>>>> with > > > >>>>>>>>>>> their > > > >>>>>>>>>>>>>>>> own > > > >>>>>>>>>>>>>>>>>>>>> user > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> defined > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> services. These considerations are much > > > >>>>> more > > > >>>>>>>>>>>>> architectural. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 6:01 PM Piotr > > > >>>>>> Nowojski > > > >>>>>>> < > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> pi...@data-artisans.com> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Interesting idea. I’m trying to > > understand > > > >>>>>> the > > > >>>>>>>>>>> problem. > > > >>>>>>>>>>>>>>>>>>> Isn’t > > > >>>>>>>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `cache()` call an equivalent of writing > > > >>>>> data > > > >>>>>>> to > > > >>>>>>>> a > > > >>>>>>>>>> sink > > > >>>>>>>>>>>>> and > > > >>>>>>>>>>>>>>>>>>>> later > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> reading > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> from it? Where this sink has a limited > > > >>>>> live > > > >>>>>>>>>> scope/live > > > >>>>>>>>>>>>>>>> time? > > > >>>>>>>>>>>>>>>>>>>> And > > > >>>>>>>>>>>>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> sink > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> could be implemented as in memory or a > > > >>>>> file > > > >>>>>>>> sink? > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If so, what’s the problem with > creating a > > > >>>>>>>>>> materialised > > > >>>>>>>>>>>>>>>> view > > > >>>>>>>>>>>>>>>>>>>>> from a > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> table > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> “b” (from your document’s example) and > > > >>>>>> reusing > > > >>>>>>>>> this > > > >>>>>>>>>>>>>>>>>>>> materialised > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> view > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> later? Maybe we are lacking mechanisms > to > > > >>>>>>> clean > > > >>>>>>>> up > > > >>>>>>>>>>>>>>>>>>>> materialised > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> views > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> (for > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> example when current session finishes)? > > > >>>>>> Maybe > > > >>>>>>> we > > > >>>>>>>>>> need > > > >>>>>>>>>>>>> some > > > >>>>>>>>>>>>>>>>>>>>>>>>>> syntactic > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> sugar > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on top of it? > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Piotrek > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 23 Nov 2018, at 07:21, Becket Qin < > > > >>>>>>>>>>>>>>>> becket....@gmail.com > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the suggestion, Jincheng. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, I think it makes sense to have a > > > >>>>>>> persist() > > > >>>>>>>>>> with > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> lifecycle/defined > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> scope. I just added a section in the > > > >>>>> future > > > >>>>>>>> work > > > >>>>>>>>>> for > > > >>>>>>>>>>>>>>>> this. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 1:55 PM > jincheng > > > >>>>>> sun > > > >>>>>>> < > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> sunjincheng...@gmail.com > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jiangjie, > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thank you for the explanation about > the > > > >>>>>> name > > > >>>>>>>> of > > > >>>>>>>>>>>>>>>>>>> `cache()`, I > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> understand > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> why > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> you designed this way! > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Another idea is whether we can > specify > > a > > > >>>>>>>>> lifecycle > > > >>>>>>>>>>> for > > > >>>>>>>>>>>>>>>>>>> data > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> persistence? > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For example, persist > > > >>>>> (LifeCycle.SESSION), > > > >>>>>> so > > > >>>>>>>>> that > > > >>>>>>>>>>> the > > > >>>>>>>>>>>>>>>> user > > > >>>>>>>>>>>>>>>>>>>> is > > > >>>>>>>>>>>>>>>>>>>>>>>>>> not > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> worried > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> about data loss, and will clearly > > > >>>>> specify > > > >>>>>>> the > > > >>>>>>>>> time > > > >>>>>>>>>>>>> range > > > >>>>>>>>>>>>>>>>>>> for > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> keeping > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> time. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> At the same time, if we want to > expand, > > > >>>>> we > > > >>>>>>> can > > > >>>>>>>>>> also > > > >>>>>>>>>>>>>>>> share > > > >>>>>>>>>>>>>>>>>>>> in a > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> certain > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> group of session, for example: > > > >>>>>>>>>>>>>>>>>>>> LifeCycle.SESSION_GROUP(...), I > > > >>>>>>>>>>>>>>>>>>>>>>>>>> am > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> not > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sure, > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> just an immature suggestion, for > > > >>>>> reference > > > >>>>>>>> only! > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bests, > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jincheng > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Becket Qin <becket....@gmail.com> > > > >>>>>>>>> 于2018年11月23日周五 > > > >>>>>>>>>>>>>>>>>>> 下午1:33写道: > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Re: Jincheng, > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback. Regarding > > > >>>>>> cache() > > > >>>>>>>> v.s. > > > >>>>>>>>>>>>>>>>>>> persist(), > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> personally I > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> find cache() to be more accurately > > > >>>>>>> describing > > > >>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>> behavior, > > > >>>>>>>>>>>>>>>>>>>>>>>>>> i.e. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Table > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is cached for the session, but will > be > > > >>>>>>>> deleted > > > >>>>>>>>>>> after > > > >>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>>>>>>>>> session > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> is > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> closed. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> persist() seems a little misleading > as > > > >>>>>>> people > > > >>>>>>>>>> might > > > >>>>>>>>>>>>>>>> think > > > >>>>>>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> table > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> still be there even after the > session > > > >>>>> is > > > >>>>>>>> gone. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Great point about mixing the batch > and > > > >>>>>>> stream > > > >>>>>>>>>>>>>>>> processing > > > >>>>>>>>>>>>>>>>>>> in > > > >>>>>>>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> same > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> job. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> We should absolutely move towards > that > > > >>>>>>> goal. > > > >>>>>>>> I > > > >>>>>>>>>>>>> imagine > > > >>>>>>>>>>>>>>>>>>> that > > > >>>>>>>>>>>>>>>>>>>>>>>>>> would > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> be > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> a > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> huge > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> change across the board, including > > > >>>>>> sources, > > > >>>>>>>>>>> operators > > > >>>>>>>>>>>>>>>> and > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> optimizations, > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> name some. Likely we will need > several > > > >>>>>>>> separate > > > >>>>>>>>>>>>>>>> in-depth > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> discussions. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 5:14 AM > > Xingcan > > > >>>>>>> Cui < > > > >>>>>>>>>>>>>>>>>>>>>>>>>> xingc...@gmail.com> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all, > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Shaoxuan, I think the lifecycle or > > > >>>>>> access > > > >>>>>>>>>> domain > > > >>>>>>>>>>>>> are > > > >>>>>>>>>>>>>>>>>>> both > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> orthogonal > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the cache problem. Essentially, > this > > > >>>>> may > > > >>>>>>> be > > > >>>>>>>>> the > > > >>>>>>>>>>>>> first > > > >>>>>>>>>>>>>>>>>>> time > > > >>>>>>>>>>>>>>>>>>>>> we > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> plan > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> to > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> introduce another storage mechanism > > > >>>>>> other > > > >>>>>>>> than > > > >>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>> state. > > > >>>>>>>>>>>>>>>>>>>>>>>>>> Maybe > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> it’s > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> better > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to first draw a big picture and > then > > > >>>>>>>>> concentrate > > > >>>>>>>>>>> on > > > >>>>>>>>>>>>> a > > > >>>>>>>>>>>>>>>>>>>>> specific > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> part? > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Becket, yes, actually I am more > > > >>>>>> concerned > > > >>>>>>>>> with > > > >>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>>>> underlying > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> service. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This seems to be quite a major > change > > > >>>>> to > > > >>>>>>> the > > > >>>>>>>>>>>>> existing > > > >>>>>>>>>>>>>>>>>>>>>>>>>> codebase. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> As > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> you > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> claimed, the service should be > > > >>>>>> extendible > > > >>>>>>> to > > > >>>>>>>>>>> support > > > >>>>>>>>>>>>>>>>>>> other > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> components > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> we’d better discussed it in another > > > >>>>>>> thread. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> All in all, I also eager to enjoy > the > > > >>>>>> more > > > >>>>>>>>>>>>> interactive > > > >>>>>>>>>>>>>>>>>>>> Table > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> API, > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> in > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> case > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of a general and flexible enough > > > >>>>> service > > > >>>>>>>>>>> mechanism. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Xingcan > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Nov 22, 2018, at 10:16 AM, > > Xiaowei > > > >>>>>>>> Jiang < > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> xiaow...@gmail.com> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Relying on a callback for the temp > > > >>>>>> table > > > >>>>>>>> for > > > >>>>>>>>>>> clean > > > >>>>>>>>>>>>> up > > > >>>>>>>>>>>>>>>>>>> is > > > >>>>>>>>>>>>>>>>>>>>> not > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> very > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reliable. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> There is no guarantee that it will > > be > > > >>>>>>>>> executed > > > >>>>>>>>>>>>>>>>>>>>> successfully. > > > >>>>>>>>>>>>>>>>>>>>>>>>>> We > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> may > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> risk > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> leaks when that happens. I think > > that > > > >>>>>>> it's > > > >>>>>>>>>> safer > > > >>>>>>>>>>> to > > > >>>>>>>>>>>>>>>>>>> have > > > >>>>>>>>>>>>>>>>>>>> an > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> association > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> between temp table and session id. > > So > > > >>>>>> we > > > >>>>>>>> can > > > >>>>>>>>>>> always > > > >>>>>>>>>>>>>>>>>>> clean > > > >>>>>>>>>>>>>>>>>>>>> up > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> temp > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> tables > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> which are no longer associated > with > > > >>>>> any > > > >>>>>>>>> active > > > >>>>>>>>>>>>>>>>>>> sessions. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Xiaowei > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Nov 22, 2018 at 12:55 PM > > > >>>>>> jincheng > > > >>>>>>>>> sun < > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sunjincheng...@gmail.com> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jiangjie&Shaoxuan, > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for initiating this great > > > >>>>>>> proposal! > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Interactive Programming is very > > > >>>>> useful > > > >>>>>>> and > > > >>>>>>>>>> user > > > >>>>>>>>>>>>>>>>>>> friendly > > > >>>>>>>>>>>>>>>>>>>>> in > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> case > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> of > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> your > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> examples. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Moreover, especially when a > > business > > > >>>>>> has > > > >>>>>>>> to > > > >>>>>>>>> be > > > >>>>>>>>>>>>>>>>>>> executed > > > >>>>>>>>>>>>>>>>>>>> in > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> several > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> stages > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with dependencies,such as the > > > >>>>> pipeline > > > >>>>>>> of > > > >>>>>>>>>> Flink > > > >>>>>>>>>>>>> ML, > > > >>>>>>>>>>>>>>>> in > > > >>>>>>>>>>>>>>>>>>>>> order > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> to > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> utilize > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> intermediate calculation results > we > > > >>>>>> have > > > >>>>>>>> to > > > >>>>>>>>>>>>> submit a > > > >>>>>>>>>>>>>>>>>>> job > > > >>>>>>>>>>>>>>>>>>>>> by > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> env.execute(). > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About the `cache()` , I think is > > > >>>>>> better > > > >>>>>>>> to > > > >>>>>>>>>>> named > > > >>>>>>>>>>>>>>>>>>>>>>>>>> `persist()`, > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> And > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink framework determines > whether > > > >>>>> we > > > >>>>>>>>>> internally > > > >>>>>>>>>>>>>>>> cache > > > >>>>>>>>>>>>>>>>>>>> in > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> memory > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> or > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> persist > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to the storage system,Maybe save > > the > > > >>>>>>> data > > > >>>>>>>>> into > > > >>>>>>>>>>>>> state > > > >>>>>>>>>>>>>>>>>>>>> backend > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (MemoryStateBackend or > > > >>>>>>> RocksDBStateBackend > > > >>>>>>>>>> etc.) > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> BTW, from the points of my view > in > > > >>>>> the > > > >>>>>>>>> future, > > > >>>>>>>>>>>>>>>> support > > > >>>>>>>>>>>>>>>>>>>> for > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> streaming > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> batch mode switching in the same > > job > > > >>>>>>> will > > > >>>>>>>>> also > > > >>>>>>>>>>>>>>>> benefit > > > >>>>>>>>>>>>>>>>>>>> in > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "Interactive > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Programming", I am looking > forward > > > >>>>> to > > > >>>>>>>> your > > > >>>>>>>>>>> JIRAs > > > >>>>>>>>>>>>>>>> and > > > >>>>>>>>>>>>>>>>>>>>> FLIP! > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jincheng > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Becket Qin <becket....@gmail.com > > > > > >>>>>>>>>>> 于2018年11月20日周二 > > > >>>>>>>>>>>>>>>>>>>>> 下午9:56写道: > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all, > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As a few recent email threads > have > > > >>>>>>>> pointed > > > >>>>>>>>>> out, > > > >>>>>>>>>>>>> it > > > >>>>>>>>>>>>>>>>>>> is a > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> promising > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> opportunity to enhance Flink > Table > > > >>>>>> API > > > >>>>>>> in > > > >>>>>>>>>>> various > > > >>>>>>>>>>>>>>>>>>>>> aspects, > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> including > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> functionality and ease of use > > among > > > >>>>>>>> others. > > > >>>>>>>>>> One > > > >>>>>>>>>>>>> of > > > >>>>>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> scenarios > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> where > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> we > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> feel Flink could improve is > > > >>>>>> interactive > > > >>>>>>>>>>>>>>>> programming. > > > >>>>>>>>>>>>>>>>>>> To > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> explain > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> issues > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and facilitate the discussion on > > > >>>>> the > > > >>>>>>>>>> solution, > > > >>>>>>>>>>> we > > > >>>>>>>>>>>>>>>> put > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> together > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> following document with our > > > >>>>> proposal. > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>> > > > > > > https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Feedback and comments are very > > > >>>>>> welcome! > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>> > > > >>> > > > > > > > > > -- Best Regards Jeff Zhang