Hi Becket, With `uncache` there are probably two features that we can think about:
a) Physically dropping the cached table from the storage, freeing up the resources b) Hinting the optimizer to not cache the reads for the next query/table a) Has the issue as I wrote before, that it seemed to be an operation inherently “flawed" with having side effects. I’m not sure how it would be best to express. We could make it work: 1. via a method on a Table as you proposed: void Table#dropCache() void Table#uncache() 2. Operation on the environment env.dropCacheFor(table) // or some other argument that allows user to identify the desired cache 3. Extending (from your original design doc) `setTableService` method to return some control handle like: TableServiceControl setTableService(TableFactory tf, TableProperties properties, TempTableCleanUpCallback cleanUpCallback); (TableServiceControl? TableService? TableServiceHandle? CacheService?) And having the drop cache method there: TableServiceControl#dropCache(table) Out of those options, option 1 might have a disadvantage of kind of not making the user aware, that this is a global operation with side effects. Like the old example of: public void foo(Table t) { // … t.dropCache(); } It might not be immediately obvious that `t.dropCache()` is some kind of global operation, with side effects visible outside of the `foo` function. On the other hand, both option 2 and 3, might have greater chance of catching user’s attention: public void foo(Table t, CacheService cacheService) { // … cacheService.dropCache(t); } b) could be achieved quite easily: Table a = … val notCached1 = a.doNotCache() val cachedA = a.cache() val notCached2 = cachedA.doNotCache() // equivalent of notCached1 `doNotCache()` would behave similarly to `cache()` - return a copy of the table with removed “cache” hint and/or added “never cache” hint. Piotrek > On 8 Jan 2019, at 03:17, Becket Qin <becket....@gmail.com> wrote: > > Hi Piotr, > > Thanks for the proposal and detailed explanation. I like the idea of > returning a new hinted Table without modifying the original table. This > also leave the room for users to benefit from future implicit caching. > > Just to make sure I get the full picture. In your proposal, there will also > be a 'void Table#uncache()' method to release the cache, right? > > Thanks, > > Jiangjie (Becket) Qin > > On Mon, Jan 7, 2019 at 11:50 PM Piotr Nowojski <pi...@da-platform.com> > wrote: > >> Hi Becket! >> >> After further thinking I tend to agree that my previous proposal (*Option >> 2*) indeed might not be if would in the future introduce automatic caching. >> However I would like to propose a slightly modified version of it: >> >> *Option 4* >> >> Adding `cache()` method with following signature: >> >> Table Table#cache(); >> >> Without side-effects, and `cache()` call do not modify/change original >> Table in any way. >> It would return a copy of original table, with added hint for the >> optimizer to cache the table, so that the future accesses to the returned >> table might be cached or not. >> >> Assuming that we are talking about a setup, where we do not have automatic >> caching enabled (possible future extension). >> >> Example #1: >> >> ``` >> Table a = … >> a.foo() // not cached >> >> val cachedTable = a.cache(); >> >> cachedA.bar() // maybe cached >> a.foo() // same as before - effectively not cached >> ``` >> >> Both the first and the second `a.foo()` operations would behave in the >> exactly same way. Again, `a.cache()` call doesn’t affect `a` itself. If `a` >> was not hinted for caching before `a.cache();`, then both `a.foo()` calls >> wouldn’t use cache. >> >> Returned `cachedA` would be hinted with “cache” hint, so probably >> `cachedA.bar()` would go through cache (unless optimiser decides the >> opposite) >> >> Example #2 >> >> ``` >> Table a = … >> >> a.foo() // not cached >> >> val b = a.cache(); >> >> a.foo() // same as before - effectively not cached >> b.foo() // maybe cached >> >> val c = b.cache(); >> >> a.foo() // same as before - effectively not cached >> b.foo() // same as before - effectively maybe cached >> c.foo() // maybe cached >> ``` >> >> Now, assuming that we have some future “automatic caching optimisation”: >> >> Example #3 >> >> ``` >> env.enableAutomaticCaching() >> Table a = … >> >> a.foo() // might be cached, depending if `a` was selected to automatic >> caching >> >> val b = a.cache(); >> >> a.foo() // same as before - might be cached, if `a` was selected to >> automatic caching >> b.foo() // maybe cached >> ``` >> >> >> More or less this is the same behaviour as: >> >> Table a = ... >> val b = a.filter(x > 20) >> >> calling `filter` hasn’t changed or altered `a` in anyway. If `a` was >> previously filtered: >> >> Table src = … >> val a = src.filter(x > 20) >> val b = a.filter(x > 20) >> >> then yes, `a` and `b` will be the same. But the point is that neither >> `filter` nor `cache` changes the original `a` table. >> >> One thing is that indeed, physically dropping cache operation, will have >> side effects and it will in a way mutate the cached table references. But >> this is I think unavoidable in any solution - the same issue as calling >> `.close()`, or calling destructor in C++. >> >> Piotrek >> >>> On 7 Jan 2019, at 10:41, Becket Qin <becket....@gmail.com> wrote: >>> >>> Happy New Year, everybody! >>> >>> I would like to resume this discussion thread. At this point, We have >>> agreed on the first step goal of interactive programming. The open >>> discussion is the exact API. More specifically, what should *cache()* >>> method return and what is the semantic. There are three options: >>> >>> *Option 1* >>> *void cache()* OR *Table cache()* which returns the original table for >>> chained calls. >>> *void uncache() *releases the cache. >>> *Table.hint(ignoreCache).foo()* to ignore cache for operation foo(). >>> >>> - Semantic: a.cache() hints that table 'a' should be cached. Optimizer >>> decides whether the cache will be used or not. >>> - pros: simple and no confusion between CachedTable and original table >>> - cons: A table may be cached / uncached in a method invocation, while >> the >>> caller does not know about this. >>> >>> *Option 2* >>> *CachedTable cache()* >>> *CachedTable *extends *Table *with an additional *uncache()* method >>> >>> - Semantic: After *val cachedA = a.cache()*, *cachedA.foo()* will always >>> use cache. *a.bar() *will always use original DAG. >>> - pros: No potential side effects in method invocation. >>> - cons: Optimizer has no chance to kick in. Future optimization will >> become >>> a behavior change and need users to change the code. >>> >>> *Option 3* >>> *CacheHandle cache()* >>> *CacheHandle.release() *to release a cache handle on the table. If all >>> cache handles are released, the cache could be removed. >>> *Table.hint(ignoreCache).foo()* to ignore cache for operation foo(). >>> >>> - Semantic: *a.cache() *hints that 'a' should be cached. Optimizer >> decides >>> whether the cache will be used or not. Cache is released either no handle >>> is on it, or the user program exits. >>> - pros: No potential side effect in method invocation. No confusion >> between >>> cached table v.s original table. >>> - cons: An additional CacheHandle exposed to the users. >>> >>> >>> Personally I prefer option 3 for the following reasons: >>> 1. It is simple. Vast majority of the users would just call >>> *a.cache()* followed >>> by *a.foo(),* *a.bar(), etc. * >>> 2. There is no semantic ambiguity and semantic change if we decide to add >>> implicit cache in the future. >>> 3. There is no side effect in the method calls. >>> 4. Admittedly we need to expose one more CacheHandle class to the users. >>> But it is not that difficult to understand given similar well known >> concept >>> like ref count (we can name it CacheReference if that is easier to >>> understand). So I think it is fine. >>> >>> >>> Thanks, >>> >>> Jiangjie (Becket) Qin >>> >>> >>> On Thu, Dec 13, 2018 at 11:23 AM Becket Qin <becket....@gmail.com> >> wrote: >>> >>>> Hi Piotrek, >>>> >>>> 1. Regarding optimization. >>>> Sure there are many cases that the decision is hard to make. But that >> does >>>> not make it any easier for the users to make those decisions. I imagine >> 99% >>>> of the users would just naively use cache. I am not saying we can >> optimize >>>> in all the cases. But as long as we agree that at least in certain >> cases (I >>>> would argue most cases), optimizer can do a little better than an >> average >>>> user who likely knows little about Flink internals, we should not push >> the >>>> burden of optimization to users. >>>> >>>> BTW, it seems some of your concerns are related to the implementation. I >>>> did not mention the implementation of the caching service because that >>>> should not affect the API semantic. Not sure if this helps, but imagine >> the >>>> default implementation has one StorageNode service colocating with each >> TM. >>>> It could be running within the TM process or in a standalone process, >>>> depending on configuration. >>>> >>>> The StorageNode uses memory + spill-to-disk mechanism. The cached data >>>> will just be written to the local StorageNode service. If the >> StorageNode >>>> is running within the TM process, the in-memory cache could just be >> objects >>>> so we save some serde cost. A later job referring to the cached Table >> will >>>> be scheduled in a locality aware manner, i.e. run in the TM whose peer >>>> StorageNode hosts the data. >>>> >>>> >>>> 2. Semantic >>>> I am not sure why introducing a new hintCache() or >>>> env.enableAutomaticCaching() method would avoid the consequence of >> semantic >>>> change. >>>> >>>> If the auto optimization is not enabled by default, users still need to >>>> make code change to all existing programs in order to get the benefit. >>>> If the auto optimization is enabled by default, advanced users who know >>>> that they really want to use cache will suddenly lose the opportunity >> to do >>>> so, unless they change the code to disable auto optimization. >>>> >>>> >>>> 3. side effect >>>> The CacheHandle is not only for where to put uncache(). It is to solve >> the >>>> implicit performance impact by moving the uncache() to the CacheHandle. >>>> >>>> - If users wants to leverage cache, they can call a.cache(). After >>>> that, unless user explicitly release that CacheHandle, a.foo() will >> always >>>> leverage cache if needed (optimizer may choose to ignore cache if that >>>> helps accelerate the process). Any function call will not be able to >>>> release the cache because they do not have that CacheHandle. >>>> - If some advanced users do not want to use cache at all, they will >>>> call a.hint(ignoreCache).foo(). This will for sure ignore cache and >> use the >>>> original DAG to process. >>>> >>>> >>>>> In vast majority of the cases, users wouldn't really care whether the >>>>> cache is used or not. >>>>> I wouldn’t agree with that, because “caching” (if not purely in memory >>>>> caching) would add additional IO costs. It’s similar as saying that >> users >>>>> would not see a difference between Spark/Flink and MapReduce (MapReduce >>>>> writes data to disks after every map/reduce stage). >>>> >>>> What I wanted to say is that in most cases, after users call cache(), >> they >>>> don't really care about whether auto optimization has decided to ignore >> the >>>> cache or not, as long as the program runs faster. >>>> >>>> Thanks, >>>> >>>> Jiangjie (Becket) Qin >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Wed, Dec 12, 2018 at 10:50 PM Piotr Nowojski < >> pi...@data-artisans.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> Thanks for the quick answer :) >>>>> >>>>> Re 1. >>>>> >>>>> I generally agree with you, however couple of points: >>>>> >>>>> a) the problem with using automatic caching is bigger, because you will >>>>> have to decide, how do you compare IO vs CPU costs and if you pick >> wrong, >>>>> additional IO costs might be enormous or even can crash your system. >> This >>>>> is more difficult problem compared to let say join reordering, where >> the >>>>> only issue is to have good statistics that can capture correlations >> between >>>>> columns (when you reorder joins number of IO operations do not change) >>>>> c) your example is completely independent of caching. >>>>> >>>>> Query like this: >>>>> >>>>> src1.filte('f1 > 10).join(src2.filter('f2 < 30), `f1 ===`f2).as('f3, >>>>> …).filter(‘f3 > 30) >>>>> >>>>> Should/could be optimised to empty result immediately, without the need >>>>> for any cache/materialisation and that should work even without any >>>>> statistics provided by the connector. >>>>> >>>>> For me prerequisite to any serious cost-based optimisations would be >> some >>>>> reasonable benchmark coverage of the code (tpch?). Otherwise that >> would be >>>>> equivalent of adding not tested code, since we wouldn’t be able to >> verify >>>>> our assumptions, like how does the writing of 10 000 records to >>>>> cache/RocksDB/Kafka/CSV file compare to joining/filtering/processing of >>>>> lets say 1000 000 rows. >>>>> >>>>> Re 2. >>>>> >>>>> I wasn’t proposing to change the semantic later. I was proposing that >> we >>>>> start now: >>>>> >>>>> CachedTable cachedA = a.cache() >>>>> cachedA.foo() // Cache is used >>>>> a.bar() // Original DAG is used >>>>> >>>>> And then later we can think about adding for example >>>>> >>>>> CachedTable cachedA = a.hintCache() >>>>> cachedA.foo() // Cache might be used >>>>> a.bar() // Original DAG is used >>>>> >>>>> Or >>>>> >>>>> env.enableAutomaticCaching() >>>>> a.foo() // Cache might be used >>>>> a.bar() // Cache might be used >>>>> >>>>> Or (I would still not like this option): >>>>> >>>>> a.hintCache() >>>>> a.foo() // Cache might be used >>>>> a.bar() // Cache might be used >>>>> >>>>> Or whatever else that will come to our mind. Even if we add some >>>>> automatic caching in the future, keeping implicit (`CachedTable >> cache()`) >>>>> caching will still be useful, at least in some cases. >>>>> >>>>> Re 3. >>>>> >>>>>> 2. The source tables are immutable during one run of batch processing >>>>> logic. >>>>>> 3. The cache is immutable during one run of batch processing logic. >>>>> >>>>>> I think assumption 2 and 3 are by definition what batch processing >>>>> means, >>>>>> i.e the data must be complete before it is processed and should not >>>>> change >>>>>> when the processing is running. >>>>> >>>>> I agree that this is how batch systems SHOULD be working. However I >> know >>>>> from my previous experience that it’s not always the case. Sometimes >> users >>>>> are just working on some non transactional storage, which can be >> (either >>>>> constantly or occasionally) being modified by some other processes for >>>>> whatever the reasons (fixing the data, updating, adding new data etc). >>>>> >>>>> But even if we ignore this point (data immutability), performance side >>>>> effect issue of your proposal remains. If user calls `void a.cache()` >> deep >>>>> inside some private method, it will have implicit side effects on other >>>>> parts of his program that might not be obvious. >>>>> >>>>> Re `CacheHandle`. >>>>> >>>>> If I understand it correctly, it only addresses the issue where to >> place >>>>> method `uncache`/`dropCache`. >>>>> >>>>> Btw, >>>>> >>>>>> In vast majority of the cases, users wouldn't really care whether the >>>>> cache is used or not. >>>>> >>>>> I wouldn’t agree with that, because “caching” (if not purely in memory >>>>> caching) would add additional IO costs. It’s similar as saying that >> users >>>>> would not see a difference between Spark/Flink and MapReduce (MapReduce >>>>> writes data to disks after every map/reduce stage). >>>>> >>>>> Piotrek >>>>> >>>>>> On 12 Dec 2018, at 14:28, Becket Qin <becket....@gmail.com> wrote: >>>>>> >>>>>> Hi Piotrek, >>>>>> >>>>>> Not sure if you noticed, in my last email, I was proposing >> `CacheHandle >>>>>> cache()` to avoid the potential side effect due to function calls. >>>>>> >>>>>> Let's look at the disagreement in your reply one by one. >>>>>> >>>>>> >>>>>> 1. Optimization chances >>>>>> >>>>>> Optimization is never a trivial work. This is exactly why we should >> not >>>>> let >>>>>> user manually do that. Databases have done huge amount of work in this >>>>>> area. At Alibaba, we rely heavily on many optimization rules to boost >>>>> the >>>>>> SQL query performance. >>>>>> >>>>>> In your example, if I filling the filter conditions in a certain way, >>>>> the >>>>>> optimization would become obvious. >>>>>> >>>>>> Table src1 = … // read from connector 1 >>>>>> Table src2 = … // read from connector 2 >>>>>> >>>>>> Table a = src1.filte('f1 > 10).join(src2.filter('f2 < 30), `f1 === >>>>>> `f2).as('f3, ...) >>>>>> a.cache() // write cache to connector 3, when writing the records, >>>>> remember >>>>>> min and max of `f1 >>>>>> >>>>>> a.filter('f3 > 30) // There is no need to read from any connector >>>>> because >>>>>> `a` does not contain any record whose 'f3 is greater than 30. >>>>>> env.execute() >>>>>> a.select(…) >>>>>> >>>>>> BTW, it seems to me that adding some basic statistics is fairly >>>>>> straightforward and the cost is pretty marginal if not ignorable. In >>>>> fact >>>>>> it is not only needed for optimization, but also for cases such as ML, >>>>>> where some algorithms may need to decide their parameter based on the >>>>>> statistics of the data. >>>>>> >>>>>> >>>>>> 2. Same API, one semantic now, another semantic later. >>>>>> >>>>>> I am trying to understand what is the semantic of `CachedTable >> cache()` >>>>> you >>>>>> are proposing. IMO, we should avoid designing an API whose semantic >>>>> will be >>>>>> changed later. If we have a "CachedTable cache()" method, then the >>>>> semantic >>>>>> should be very clearly defined upfront and do not change later. It >>>>> should >>>>>> never be "right now let's go with semantic 1, later we can silently >>>>> change >>>>>> it to semantic 2 or 3". Such change could result in bad consequence. >> For >>>>>> example, let's say we decide go with semantic 1: >>>>>> >>>>>> CachedTable cachedA = a.cache() >>>>>> cachedA.foo() // Cache is used >>>>>> a.bar() // Original DAG is used. >>>>>> >>>>>> Now majority of the users would be using cachedA.foo() in their code. >>>>> And >>>>>> some advanced users will use a.bar() to explicitly skip the cache. >> Later >>>>>> on, we added smart optimization and change the semantic to semantic 2: >>>>>> >>>>>> CachedTable cachedA = a.cache() >>>>>> cachedA.foo() // Cache is used >>>>>> a.bar() // Cache MIGHT be used, and Flink may decide to skip cache if >>>>> it is >>>>>> faster. >>>>>> >>>>>> Now most of the users who were writing cachedA.foo() will not benefit >>>>> from >>>>>> this optimization at all, unless they change their code to use a.foo() >>>>>> instead. And those advanced users suddenly lose the option to >> explicitly >>>>>> ignore cache unless they change their code (assuming we care enough to >>>>>> provide something like hint(useCache)). If we don't define the >> semantic >>>>>> carefully, our users will have to change their code again and again >>>>> while >>>>>> they shouldn't have to. >>>>>> >>>>>> >>>>>> 3. side effect. >>>>>> >>>>>> Before we talk about side effect, we have to agree on the assumptions. >>>>> The >>>>>> assumptions I have are following: >>>>>> 1. We are talking about batch processing. >>>>>> 2. The source tables are immutable during one run of batch processing >>>>> logic. >>>>>> 3. The cache is immutable during one run of batch processing logic. >>>>>> >>>>>> I think assumption 2 and 3 are by definition what batch processing >>>>> means, >>>>>> i.e the data must be complete before it is processed and should not >>>>> change >>>>>> when the processing is running. >>>>>> >>>>>> As far as I am aware of, I don't know any batch processing system >>>>> breaking >>>>>> those assumptions. Even for relational database tables, where queries >>>>> can >>>>>> run with concurrent modifications, necessary locking are still >> required >>>>> to >>>>>> ensure the integrity of the query result. >>>>>> >>>>>> Please let me know if you disagree with the above assumptions. If you >>>>> agree >>>>>> with these assumptions, with the `CacheHandle cache()` API in my last >>>>>> email, do you still see side effects? >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Jiangjie (Becket) Qin >>>>>> >>>>>> >>>>>> On Wed, Dec 12, 2018 at 7:11 PM Piotr Nowojski < >> pi...@data-artisans.com >>>>>> >>>>>> wrote: >>>>>> >>>>>>> Hi Becket, >>>>>>> >>>>>>>> Regarding the chance of optimization, it might not be that rare. >> Some >>>>>>> very >>>>>>>> simple statistics could already help in many cases. For example, >>>>> simply >>>>>>>> maintaining max and min of each fields can already eliminate some >>>>>>>> unnecessary table scan (potentially scanning the cached table) if >> the >>>>>>>> result is doomed to be empty. A histogram would give even further >>>>>>>> information. The optimizer could be very careful and only ignores >>>>> cache >>>>>>>> when it is 100% sure doing that is cheaper. e.g. only when a filter >> on >>>>>>> the >>>>>>>> cache will absolutely return nothing. >>>>>>> >>>>>>> I do not see how this might be easy to achieve. It would require tons >>>>> of >>>>>>> effort to make it work and in the end you would still have a problem >> of >>>>>>> comparing/trading CPU cycles vs IO. For example: >>>>>>> >>>>>>> Table src1 = … // read from connector 1 >>>>>>> Table src2 = … // read from connector 2 >>>>>>> >>>>>>> Table a = src1.filter(…).join(src2.filter(…), …) >>>>>>> a.cache() // write cache to connector 3 >>>>>>> >>>>>>> a.filter(…) >>>>>>> env.execute() >>>>>>> a.select(…) >>>>>>> >>>>>>> Decision whether it’s better to: >>>>>>> A) read from connector1/connector2, filter/map and join them twice >>>>>>> B) read from connector1/connector2, filter/map and join them once, >> pay >>>>> the >>>>>>> price of writing to connector 3 and then reading from it >>>>>>> >>>>>>> Is very far from trivial. `a` can end up much larger than `src1` and >>>>>>> `src2`, writes to connector 3 might be extremely slow, reads from >>>>> connector >>>>>>> 3 can be slower compared to reads from connector 1 & 2, … . You >> really >>>>> need >>>>>>> to have extremely good statistics to correctly asses size of the >>>>> output and >>>>>>> it would still be failing many times (correlations etc). And keep in >>>>> mind >>>>>>> that at the moment we do not have ANY statistics at all. More than >>>>> that, it >>>>>>> would require significantly more testing and setting up some >>>>> benchmarks to >>>>>>> make sure that we do not brake it with some regressions. >>>>>>> >>>>>>> That’s why I’m strongly opposing this idea - at least let’s not >> starts >>>>>>> with this. If we first start with completely manual/explicit caching, >>>>>>> without any magic, it would be a significant improvement for the >> users >>>>> for >>>>>>> a fraction of the development cost. After implementing that, when we >>>>>>> already have all of the working pieces, we can start working on some >>>>>>> optimisations rules. As I wrote before, if we start with >>>>>>> >>>>>>> `CachedTable cache()` >>>>>>> >>>>>>> We can later work on follow up stories to make it automatic. Despite >>>>> that >>>>>>> I don’t like this implicit/side effect approach with `void` method, >>>>> having >>>>>>> explicit `CachedTable cache()` wouldn’t even prevent as from later >>>>> adding >>>>>>> `void hintCache()` method, with the exact semantic that you want. >>>>>>> >>>>>>> On top of that I re-rise again that having implicit `void >>>>>>> cache()/hintCache()` has other side effects and problems with non >>>>> immutable >>>>>>> data, and being annoying when used secretly inside methods. >>>>>>> >>>>>>> Explicit `CachedTable cache()` just looks like much less >> controversial >>>>> MVP >>>>>>> and if we decide to go further with this topic, it’s not a wasted >>>>> effort, >>>>>>> but just lies on a stright path to more advanced/complicated >> solutions >>>>> in >>>>>>> the future. Are there any drawbacks of starting with `CachedTable >>>>> cache()` >>>>>>> that I’m missing? >>>>>>> >>>>>>> Piotrek >>>>>>> >>>>>>>> On 12 Dec 2018, at 09:30, Jeff Zhang <zjf...@gmail.com> wrote: >>>>>>>> >>>>>>>> Hi Becket, >>>>>>>> >>>>>>>> Introducing CacheHandle seems too complicated. That means users have >>>>> to >>>>>>>> maintain Handler properly. >>>>>>>> >>>>>>>> And since cache is just a hint for optimizer, why not just return >>>>> Table >>>>>>>> itself for cache method. This hint info should be kept in Table I >>>>>>> believe. >>>>>>>> >>>>>>>> So how about adding method cache and uncache for Table, and both >>>>> return >>>>>>>> Table. Because what cache and uncache did is just adding some hint >>>>> info >>>>>>>> into Table. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Becket Qin <becket....@gmail.com> 于2018年12月12日周三 上午11:25写道: >>>>>>>> >>>>>>>>> Hi Till and Piotrek, >>>>>>>>> >>>>>>>>> Thanks for the clarification. That solves quite a few confusion. My >>>>>>>>> understanding of how cache works is same as what Till describe. >> i.e. >>>>>>>>> cache() is a hint to Flink, but it is not guaranteed that cache >>>>> always >>>>>>>>> exist and it might be recomputed from its lineage. >>>>>>>>> >>>>>>>>> Is this the core of our disagreement here? That you would like this >>>>>>>>>> “cache()” to be mostly hint for the optimiser? >>>>>>>>> >>>>>>>>> Semantic wise, yes. That's also why I think materialize() has a >> much >>>>>>> larger >>>>>>>>> scope than cache(), thus it should be a different method. >>>>>>>>> >>>>>>>>> Regarding the chance of optimization, it might not be that rare. >> Some >>>>>>> very >>>>>>>>> simple statistics could already help in many cases. For example, >>>>> simply >>>>>>>>> maintaining max and min of each fields can already eliminate some >>>>>>>>> unnecessary table scan (potentially scanning the cached table) if >> the >>>>>>>>> result is doomed to be empty. A histogram would give even further >>>>>>>>> information. The optimizer could be very careful and only ignores >>>>> cache >>>>>>>>> when it is 100% sure doing that is cheaper. e.g. only when a filter >>>>> on >>>>>>> the >>>>>>>>> cache will absolutely return nothing. >>>>>>>>> >>>>>>>>> Given the above clarification on cache, I would like to revisit the >>>>>>>>> original "void cache()" proposal and see if we can improve on top >> of >>>>>>> that. >>>>>>>>> >>>>>>>>> What do you think about the following modified interface? >>>>>>>>> >>>>>>>>> Table { >>>>>>>>> /** >>>>>>>>> * This call hints Flink to maintain a cache of this table and >>>>> leverage >>>>>>>>> it for performance optimization if needed. >>>>>>>>> * Note that Flink may still decide to not use the cache if it is >>>>>>> cheaper >>>>>>>>> by doing so. >>>>>>>>> * >>>>>>>>> * A CacheHandle will be returned to allow user release the cache >>>>>>>>> actively. The cache will be deleted if there >>>>>>>>> * is no unreleased cache handlers to it. When the TableEnvironment >>>>> is >>>>>>>>> closed. The cache will also be deleted >>>>>>>>> * and all the cache handlers will be released. >>>>>>>>> * >>>>>>>>> * @return a CacheHandle referring to the cache of this table. >>>>>>>>> */ >>>>>>>>> CacheHandle cache(); >>>>>>>>> } >>>>>>>>> >>>>>>>>> CacheHandle { >>>>>>>>> /** >>>>>>>>> * Close the cache handle. This method does not necessarily deletes >>>>> the >>>>>>>>> cache. Instead, it simply decrements the reference counter to the >>>>> cache. >>>>>>>>> * When the there is no handle referring to a cache. The cache will >>>>> be >>>>>>>>> deleted. >>>>>>>>> * >>>>>>>>> * @return the number of open handles to the cache after this handle >>>>>>> has >>>>>>>>> been released. >>>>>>>>> */ >>>>>>>>> int release() >>>>>>>>> } >>>>>>>>> >>>>>>>>> The rationale behind this interface is following: >>>>>>>>> In vast majority of the cases, users wouldn't really care whether >> the >>>>>>> cache >>>>>>>>> is used or not. So I think the most intuitive way is letting >> cache() >>>>>>> return >>>>>>>>> nothing. So nobody needs to worry about the difference between >>>>>>> operations >>>>>>>>> on CacheTables and those on the "original" tables. This will make >>>>> maybe >>>>>>>>> 99.9% of the users happy. There were two concerns raised for this >>>>>>> approach: >>>>>>>>> 1. In some rare cases, users may want to ignore cache, >>>>>>>>> 2. A table might be cached/uncached in a third party function while >>>>> the >>>>>>>>> caller does not know. >>>>>>>>> >>>>>>>>> For the first issue, users can use hint("ignoreCache") to >> explicitly >>>>>>> ignore >>>>>>>>> cache. >>>>>>>>> For the second issue, the above proposal lets cache() return a >>>>>>> CacheHandle, >>>>>>>>> the only method in it is release(). Different CacheHandles will >>>>> refer to >>>>>>>>> the same cache, if a cache no longer has any cache handle, it will >> be >>>>>>>>> deleted. This will address the following case: >>>>>>>>> { >>>>>>>>> val handle1 = a.cache() >>>>>>>>> process(a) >>>>>>>>> a.select(...) // cache is still available, handle1 has not been >>>>>>> released. >>>>>>>>> } >>>>>>>>> >>>>>>>>> void process(Table t) { >>>>>>>>> val handle2 = t.cache() // new handle to cache >>>>>>>>> t.select(...) // optimizer decides cache usage >>>>>>>>> t.hint("ignoreCache").select(...) // cache is ignored >>>>>>>>> handle2.release() // release the handle, but the cache may still be >>>>>>>>> available if there are other handles >>>>>>>>> ... >>>>>>>>> } >>>>>>>>> >>>>>>>>> Does the above modified approach look reasonable to you? >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> >>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Dec 11, 2018 at 6:44 PM Till Rohrmann < >> trohrm...@apache.org> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Becket, >>>>>>>>>> >>>>>>>>>> I was aiming at semantics similar to 1. I actually thought that >>>>>>> `cache()` >>>>>>>>>> would tell the system to materialize the intermediate result so >> that >>>>>>>>>> subsequent queries don't need to reprocess it. This means that the >>>>>>> usage >>>>>>>>> of >>>>>>>>>> the cached table in this example >>>>>>>>>> >>>>>>>>>> { >>>>>>>>>> val cachedTable = a.cache() >>>>>>>>>> val b1 = cachedTable.select(…) >>>>>>>>>> val b2 = cachedTable.foo().select(…) >>>>>>>>>> val b3 = cachedTable.bar().select(...) >>>>>>>>>> val c1 = a.select(…) >>>>>>>>>> val c2 = a.foo().select(…) >>>>>>>>>> val c3 = a.bar().select(...) >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> strongly depends on interleaved calls which trigger the execution >> of >>>>>>> sub >>>>>>>>>> queries. So for example, if there is only a single env.execute >> call >>>>> at >>>>>>>>> the >>>>>>>>>> end of block, then b1, b2, b3, c1, c2 and c3 would all be >> computed >>>>> by >>>>>>>>>> reading directly from the sources (given that there is only a >> single >>>>>>>>>> JobGraph). It just happens that the result of `a` will be cached >>>>> such >>>>>>>>> that >>>>>>>>>> we skip the processing of `a` when there are subsequent queries >>>>> reading >>>>>>>>>> from `cachedTable`. If for some reason the system cannot >> materialize >>>>>>> the >>>>>>>>>> table (e.g. running out of disk space, ttl expired), then it could >>>>> also >>>>>>>>>> happen that we need to reprocess `a`. In that sense `cachedTable` >>>>>>> simply >>>>>>>>> is >>>>>>>>>> an identifier for the materialized result of `a` with the lineage >>>>> how >>>>>>> to >>>>>>>>>> reprocess it. >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Till >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Dec 11, 2018 at 11:01 AM Piotr Nowojski < >>>>>>> pi...@data-artisans.com >>>>>>>>>> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Becket, >>>>>>>>>>> >>>>>>>>>>>> { >>>>>>>>>>>> val cachedTable = a.cache() >>>>>>>>>>>> val b = cachedTable.select(...) >>>>>>>>>>>> val c = a.select(...) >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> Semantic 1. b uses cachedTable as user demanded so. c uses >>>>> original >>>>>>>>> DAG >>>>>>>>>>> as >>>>>>>>>>>> user demanded so. In this case, the optimizer has no chance to >>>>>>>>>> optimize. >>>>>>>>>>>> Semantic 2. b uses cachedTable as user demanded so. c leaves the >>>>>>>>>>> optimizer >>>>>>>>>>>> to choose whether the cache or DAG should be used. In this case, >>>>> user >>>>>>>>>>> lose >>>>>>>>>>>> the option to NOT use cache. >>>>>>>>>>>> >>>>>>>>>>>> As you can see, neither of the options seem perfect. However, I >>>>> guess >>>>>>>>>> you >>>>>>>>>>>> and Till are proposing the third option: >>>>>>>>>>>> >>>>>>>>>>>> Semantic 3. b leaves the optimizer to choose whether cache or >> DAG >>>>>>>>>> should >>>>>>>>>>> be >>>>>>>>>>>> used. c always use the DAG. >>>>>>>>>>> >>>>>>>>>>> I am pretty sure that me, Till, Fabian and others were all >>>>> proposing >>>>>>>>> and >>>>>>>>>>> advocating in favour of semantic “1”. No cost based optimiser >>>>>>> decisions >>>>>>>>>> at >>>>>>>>>>> all. >>>>>>>>>>> >>>>>>>>>>> { >>>>>>>>>>> val cachedTable = a.cache() >>>>>>>>>>> val b1 = cachedTable.select(…) >>>>>>>>>>> val b2 = cachedTable.foo().select(…) >>>>>>>>>>> val b3 = cachedTable.bar().select(...) >>>>>>>>>>> val c1 = a.select(…) >>>>>>>>>>> val c2 = a.foo().select(…) >>>>>>>>>>> val c3 = a.bar().select(...) >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> All b1, b2 and b3 are reading from cache, while c1, c2 and c3 are >>>>>>>>>>> re-executing whole plan for “a”. >>>>>>>>>>> >>>>>>>>>>> In the future we could discuss going one step further, >> introducing >>>>>>> some >>>>>>>>>>> global optimisation (that can be manually enabled/disabled): >>>>>>>>> deduplicate >>>>>>>>>>> plan nodes/deduplicate sub queries/re-use sub queries results/or >>>>>>>>> whatever >>>>>>>>>>> we could call it. It could do two things: >>>>>>>>>>> >>>>>>>>>>> 1. Automatically try to deduplicate fragments of the plan and >> share >>>>>>> the >>>>>>>>>>> result using CachedTable - in other words automatically insert >>>>>>>>>> `CachedTable >>>>>>>>>>> cache()` calls. >>>>>>>>>>> 2. Automatically make decision to bypass explicit `CachedTable` >>>>> access >>>>>>>>>>> (this would be the equivalent of what you described as “semantic >>>>> 3”). >>>>>>>>>>> >>>>>>>>>>> However as I wrote previously, I have big doubts if such >> cost-based >>>>>>>>>>> optimisation would work (this applies also to “Semantic 2”). I >>>>> would >>>>>>>>>> expect >>>>>>>>>>> it to do more harm than good in so many cases, that it wouldn’t >>>>> make >>>>>>>>>> sense. >>>>>>>>>>> Even assuming that we calculate statistics perfectly (this ain’t >>>>> gonna >>>>>>>>>>> happen), it’s virtually impossible to correctly estimate correct >>>>>>>>> exchange >>>>>>>>>>> rate of CPU cycles vs IO operations as it is changing so much >> from >>>>>>>>>>> deployment to deployment. >>>>>>>>>>> >>>>>>>>>>> Is this the core of our disagreement here? That you would like >> this >>>>>>>>>>> “cache()” to be mostly hint for the optimiser? >>>>>>>>>>> >>>>>>>>>>> Piotrek >>>>>>>>>>> >>>>>>>>>>>> On 11 Dec 2018, at 06:00, Becket Qin <becket....@gmail.com> >>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Another potential concern for semantic 3 is that. In the future, >>>>> we >>>>>>>>> may >>>>>>>>>>> add >>>>>>>>>>>> automatic caching to Flink. e.g. cache the intermediate results >> at >>>>>>>>> the >>>>>>>>>>>> shuffle boundary. If our semantic is that reference to the >>>>> original >>>>>>>>>> table >>>>>>>>>>>> means skipping cache, those users may not be able to benefit >> from >>>>> the >>>>>>>>>>>> implicit cache. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Dec 11, 2018 at 12:10 PM Becket Qin < >> becket....@gmail.com >>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Piotrek, >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for the reply. Thought about it again, I might have >>>>>>>>>> misunderstood >>>>>>>>>>>>> your proposal in earlier emails. Returning a CachedTable might >>>>> not >>>>>>>>> be >>>>>>>>>> a >>>>>>>>>>> bad >>>>>>>>>>>>> idea. >>>>>>>>>>>>> >>>>>>>>>>>>> I was more concerned about the semantic and its intuitiveness >>>>> when a >>>>>>>>>>>>> CachedTable is returned. i..e, if cache() returns CachedTable. >>>>> What >>>>>>>>>> are >>>>>>>>>>> the >>>>>>>>>>>>> semantic in the following code: >>>>>>>>>>>>> { >>>>>>>>>>>>> val cachedTable = a.cache() >>>>>>>>>>>>> val b = cachedTable.select(...) >>>>>>>>>>>>> val c = a.select(...) >>>>>>>>>>>>> } >>>>>>>>>>>>> What is the difference between b and c? At the first glance, I >>>>> see >>>>>>>>> two >>>>>>>>>>>>> options: >>>>>>>>>>>>> >>>>>>>>>>>>> Semantic 1. b uses cachedTable as user demanded so. c uses >>>>> original >>>>>>>>>> DAG >>>>>>>>>>> as >>>>>>>>>>>>> user demanded so. In this case, the optimizer has no chance to >>>>>>>>>> optimize. >>>>>>>>>>>>> Semantic 2. b uses cachedTable as user demanded so. c leaves >> the >>>>>>>>>>> optimizer >>>>>>>>>>>>> to choose whether the cache or DAG should be used. In this >> case, >>>>>>>>> user >>>>>>>>>>> lose >>>>>>>>>>>>> the option to NOT use cache. >>>>>>>>>>>>> >>>>>>>>>>>>> As you can see, neither of the options seem perfect. However, I >>>>>>>>> guess >>>>>>>>>>> you >>>>>>>>>>>>> and Till are proposing the third option: >>>>>>>>>>>>> >>>>>>>>>>>>> Semantic 3. b leaves the optimizer to choose whether cache or >> DAG >>>>>>>>>> should >>>>>>>>>>>>> be used. c always use the DAG. >>>>>>>>>>>>> >>>>>>>>>>>>> This does address all the concerns. It is just that from >>>>>>>>> intuitiveness >>>>>>>>>>>>> perspective, I found that asking user to explicitly use a >>>>>>>>> CachedTable >>>>>>>>>>> while >>>>>>>>>>>>> the optimizer might choose to ignore is a little weird. That >> was >>>>>>>>> why I >>>>>>>>>>> did >>>>>>>>>>>>> not think about that semantic. But given there is material >>>>> benefit, >>>>>>>>> I >>>>>>>>>>> think >>>>>>>>>>>>> this semantic is acceptable. >>>>>>>>>>>>> >>>>>>>>>>>>> 1. If we want to let optimiser make decisions whether to use >>>>> cache >>>>>>>>> or >>>>>>>>>>> not, >>>>>>>>>>>>>> then why do we need “void cache()” method at all? Would It >>>>>>>>>> “increase” >>>>>>>>>>> the >>>>>>>>>>>>>> chance of using the cache? That’s sounds strange. What would >> be >>>>> the >>>>>>>>>>>>>> mechanism of deciding whether to use the cache or not? If we >>>>> want >>>>>>>>> to >>>>>>>>>>>>>> introduce such kind automated optimisations of “plan nodes >>>>>>>>>>> deduplication” >>>>>>>>>>>>>> I would turn it on globally, not per table, and let the >>>>> optimiser >>>>>>>>> do >>>>>>>>>>> all of >>>>>>>>>>>>>> the work. >>>>>>>>>>>>>> 2. We do not have statistics at the moment for any use/not use >>>>>>>>> cache >>>>>>>>>>>>>> decision. >>>>>>>>>>>>>> 3. Even if we had, I would be veeerryy sceptical whether such >>>>> cost >>>>>>>>>>> based >>>>>>>>>>>>>> optimisations would work properly and I would still insist >>>>> first on >>>>>>>>>>>>>> providing explicit caching mechanism (`CachedTable cache()`) >>>>>>>>>>>>>> >>>>>>>>>>>>> We are absolutely on the same page here. An explicit cache() >>>>> method >>>>>>>>> is >>>>>>>>>>>>> necessary not only because optimizer may not be able to make >> the >>>>>>>>> right >>>>>>>>>>>>> decision, but also because of the nature of interactive >>>>> programming. >>>>>>>>>> For >>>>>>>>>>>>> example, if users write the following code in Scala shell: >>>>>>>>>>>>> val b = a.select(...) >>>>>>>>>>>>> val c = b.select(...) >>>>>>>>>>>>> val d = c.select(...).writeToSink(...) >>>>>>>>>>>>> tEnv.execute() >>>>>>>>>>>>> There is no way optimizer will know whether b or c will be used >>>>> in >>>>>>>>>> later >>>>>>>>>>>>> code, unless users hint explicitly. >>>>>>>>>>>>> >>>>>>>>>>>>> At the same time I’m not sure if you have responded to our >>>>>>>>> objections >>>>>>>>>> of >>>>>>>>>>>>>> `void cache()` being implicit/having side effects, which me, >>>>> Jark, >>>>>>>>>>> Fabian, >>>>>>>>>>>>>> Till and I think also Shaoxuan are supporting. >>>>>>>>>>>>> >>>>>>>>>>>>> Is there any other side effects if we use semantic 3 mentioned >>>>>>>>> above? >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> >>>>>>>>>>>>> JIangjie (Becket) Qin >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Dec 10, 2018 at 7:54 PM Piotr Nowojski < >>>>>>>>>> pi...@data-artisans.com >>>>>>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Becket, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Sorry for not responding long time. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Regarding case1. >>>>>>>>>>>>>> >>>>>>>>>>>>>> There wouldn’t be no “a.unCache()” method, but I would expect >>>>> only >>>>>>>>>>>>>> `cachedTableA1.dropCache()`. Dropping `cachedTableA1` wouldn’t >>>>>>>>> affect >>>>>>>>>>>>>> `cachedTableA2`. Just as in any other database dropping >>>>> modifying >>>>>>>>> one >>>>>>>>>>>>>> independent table/materialised view does not affect others. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> What I meant is that assuming there is already a cached >> table, >>>>>>>>>> ideally >>>>>>>>>>>>>> users need >>>>>>>>>>>>>>> not to specify whether the next query should read from the >>>>> cache >>>>>>>>> or >>>>>>>>>>> use >>>>>>>>>>>>>> the >>>>>>>>>>>>>>> original DAG. This should be decided by the optimizer. >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1. If we want to let optimiser make decisions whether to use >>>>> cache >>>>>>>>> or >>>>>>>>>>>>>> not, then why do we need “void cache()” method at all? Would >> It >>>>>>>>>>> “increase” >>>>>>>>>>>>>> the chance of using the cache? That’s sounds strange. What >>>>> would be >>>>>>>>>> the >>>>>>>>>>>>>> mechanism of deciding whether to use the cache or not? If we >>>>> want >>>>>>>>> to >>>>>>>>>>>>>> introduce such kind automated optimisations of “plan nodes >>>>>>>>>>> deduplication” >>>>>>>>>>>>>> I would turn it on globally, not per table, and let the >>>>> optimiser >>>>>>>>> do >>>>>>>>>>> all of >>>>>>>>>>>>>> the work. >>>>>>>>>>>>>> 2. We do not have statistics at the moment for any use/not use >>>>>>>>> cache >>>>>>>>>>>>>> decision. >>>>>>>>>>>>>> 3. Even if we had, I would be veeerryy sceptical whether such >>>>> cost >>>>>>>>>>> based >>>>>>>>>>>>>> optimisations would work properly and I would still insist >>>>> first on >>>>>>>>>>>>>> providing explicit caching mechanism (`CachedTable cache()`) >>>>>>>>>>>>>> 4. As Till wrote, having explicit `CachedTable cache()` >> doesn’t >>>>>>>>>>>>>> contradict future work on automated cost based caching. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> At the same time I’m not sure if you have responded to our >>>>>>>>> objections >>>>>>>>>>> of >>>>>>>>>>>>>> `void cache()` being implicit/having side effects, which me, >>>>> Jark, >>>>>>>>>>> Fabian, >>>>>>>>>>>>>> Till and I think also Shaoxuan are supporting. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Piotrek >>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 5 Dec 2018, at 12:42, Becket Qin <becket....@gmail.com> >>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Till, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> It is true that after the first job submission, there will be >>>>> no >>>>>>>>>>>>>> ambiguity >>>>>>>>>>>>>>> in terms of whether a cached table is used or not. That is >> the >>>>>>>>> same >>>>>>>>>>> for >>>>>>>>>>>>>> the >>>>>>>>>>>>>>> cache() without returning a CachedTable. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Conceptually one could think of cache() as introducing a >>>>> caching >>>>>>>>>>>>>> operator >>>>>>>>>>>>>>>> from which you need to consume from if you want to benefit >>>>> from >>>>>>>>> the >>>>>>>>>>>>>> caching >>>>>>>>>>>>>>>> functionality. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I am thinking a little differently. I think it is a hint (as >>>>> you >>>>>>>>>>>>>> mentioned >>>>>>>>>>>>>>> later) instead of a new operator. I'd like to be careful >> about >>>>> the >>>>>>>>>>>>>> semantic >>>>>>>>>>>>>>> of the API. A hint is a property set on an existing operator, >>>>> but >>>>>>>>> is >>>>>>>>>>> not >>>>>>>>>>>>>>> itself an operator as it does not really manipulate the data. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I agree, ideally the optimizer makes this kind of decision >>>>> which >>>>>>>>>>>>>>>> intermediate result should be cached. But especially when >>>>>>>>> executing >>>>>>>>>>>>>> ad-hoc >>>>>>>>>>>>>>>> queries the user might better know which results need to be >>>>>>>>> cached >>>>>>>>>>>>>> because >>>>>>>>>>>>>>>> Flink might not see the full DAG. In that sense, I would >>>>> consider >>>>>>>>>> the >>>>>>>>>>>>>>>> cache() method as a hint for the optimizer. Of course, in >> the >>>>>>>>>> future >>>>>>>>>>> we >>>>>>>>>>>>>>>> might add functionality which tries to automatically cache >>>>>>>>> results >>>>>>>>>>>>>> (e.g. >>>>>>>>>>>>>>>> caching the latest intermediate results until so and so much >>>>>>>>> space >>>>>>>>>> is >>>>>>>>>>>>>>>> used). But this should hopefully not contradict with >>>>> `CachedTable >>>>>>>>>>>>>> cache()`. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I agree that cache() method is needed for exactly the reason >>>>> you >>>>>>>>>>>>>> mentioned, >>>>>>>>>>>>>>> i.e. Flink cannot predict what users are going to write >> later, >>>>> so >>>>>>>>>>> users >>>>>>>>>>>>>>> need to tell Flink explicitly that this table will be used >>>>> later. >>>>>>>>>>> What I >>>>>>>>>>>>>>> meant is that assuming there is already a cached table, >> ideally >>>>>>>>>> users >>>>>>>>>>>>>> need >>>>>>>>>>>>>>> not to specify whether the next query should read from the >>>>> cache >>>>>>>>> or >>>>>>>>>>> use >>>>>>>>>>>>>> the >>>>>>>>>>>>>>> original DAG. This should be decided by the optimizer. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> To explain the difference between returning / not returning a >>>>>>>>>>>>>> CachedTable, >>>>>>>>>>>>>>> I want compare the following two case: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> *Case 1: returning a CachedTable* >>>>>>>>>>>>>>> b = a.map(...) >>>>>>>>>>>>>>> val cachedTableA1 = a.cache() >>>>>>>>>>>>>>> val cachedTableA2 = a.cache() >>>>>>>>>>>>>>> b.print() // Just to make sure a is cached. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> c = a.filter(...) // User specify that the original DAG is >>>>> used? >>>>>>>>> Or >>>>>>>>>>> the >>>>>>>>>>>>>>> optimizer decides whether DAG or cache should be used? >>>>>>>>>>>>>>> d = cachedTableA1.filter() // User specify that the cached >>>>> table >>>>>>>>> is >>>>>>>>>>>>>> used. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> a.unCache() // Can cachedTableA still be used afterwards? >>>>>>>>>>>>>>> cachedTableA1.uncache() // Can cachedTableA2 still be used? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> *Case 2: not returning a CachedTable* >>>>>>>>>>>>>>> b = a.map() >>>>>>>>>>>>>>> a.cache() >>>>>>>>>>>>>>> a.cache() // no-op >>>>>>>>>>>>>>> b.print() // Just to make sure a is cached >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> c = a.filter(...) // Optimizer decides whether the cache or >> DAG >>>>>>>>>> should >>>>>>>>>>>>>> be >>>>>>>>>>>>>>> used >>>>>>>>>>>>>>> d = a.filter(...) // Optimizer decides whether the cache or >> DAG >>>>>>>>>> should >>>>>>>>>>>>>> be >>>>>>>>>>>>>>> used >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> a.unCache() >>>>>>>>>>>>>>> a.unCache() // no-op >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> In case 1, semantic wise, optimizer lose the option to choose >>>>>>>>>> between >>>>>>>>>>>>>> DAG >>>>>>>>>>>>>>> and cache. And the unCache() call becomes tricky. >>>>>>>>>>>>>>> In case 2, users do not need to worry about whether cache or >>>>> DAG >>>>>>>>> is >>>>>>>>>>>>>> used. >>>>>>>>>>>>>>> And the unCache() semantic is clear. However, the caveat is >>>>> that >>>>>>>>>> users >>>>>>>>>>>>>>> cannot explicitly ignore the cache. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> In order to address the issues mentioned in case 2 and >>>>> inspired by >>>>>>>>>> the >>>>>>>>>>>>>>> discussion so far, I am thinking about using hint to allow >> user >>>>>>>>>>>>>> explicitly >>>>>>>>>>>>>>> ignore cache. Although we do not have hint yet, but we >> probably >>>>>>>>>> should >>>>>>>>>>>>>> have >>>>>>>>>>>>>>> one. So the code becomes: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> *Case 3: returning this table* >>>>>>>>>>>>>>> b = a.map() >>>>>>>>>>>>>>> a.cache() >>>>>>>>>>>>>>> a.cache() // no-op >>>>>>>>>>>>>>> b.print() // Just to make sure a is cached >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> c = a.filter(...) // Optimizer decides whether the cache or >> DAG >>>>>>>>>> should >>>>>>>>>>>>>> be >>>>>>>>>>>>>>> used >>>>>>>>>>>>>>> d = a.hint("ignoreCache").filter(...) // DAG will be used >>>>> instead >>>>>>>>> of >>>>>>>>>>> the >>>>>>>>>>>>>>> cache. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> a.unCache() >>>>>>>>>>>>>>> a.unCache() // no-op >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> We could also let cache() return this table to allow chained >>>>>>>>> method >>>>>>>>>>>>>> calls. >>>>>>>>>>>>>>> Do you think this API addresses the concerns? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Dec 5, 2018 at 10:55 AM Jark Wu <imj...@gmail.com> >>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> All the recent discussions are focused on whether there is a >>>>>>>>>> problem >>>>>>>>>>> if >>>>>>>>>>>>>>>> cache() not return a Table. >>>>>>>>>>>>>>>> It seems that returning a Table explicitly is more clear >> (and >>>>>>>>>> safe?). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> So whether there are any problems if cache() returns a >> Table? >>>>>>>>>>> @Becket >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>> Jark >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Tue, 4 Dec 2018 at 22:27, Till Rohrmann < >>>>> trohrm...@apache.org >>>>>>>>>> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> It's true that b, c, d and e will all read from the >> original >>>>> DAG >>>>>>>>>>> that >>>>>>>>>>>>>>>>> generates a. But all subsequent operators (when running >>>>> multiple >>>>>>>>>>>>>> queries) >>>>>>>>>>>>>>>>> which reference cachedTableA should not need to reproduce >> `a` >>>>>>>>> but >>>>>>>>>>>>>>>> directly >>>>>>>>>>>>>>>>> consume the intermediate result. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Conceptually one could think of cache() as introducing a >>>>> caching >>>>>>>>>>>>>> operator >>>>>>>>>>>>>>>>> from which you need to consume from if you want to benefit >>>>> from >>>>>>>>>> the >>>>>>>>>>>>>>>> caching >>>>>>>>>>>>>>>>> functionality. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I agree, ideally the optimizer makes this kind of decision >>>>> which >>>>>>>>>>>>>>>>> intermediate result should be cached. But especially when >>>>>>>>>> executing >>>>>>>>>>>>>>>> ad-hoc >>>>>>>>>>>>>>>>> queries the user might better know which results need to be >>>>>>>>> cached >>>>>>>>>>>>>>>> because >>>>>>>>>>>>>>>>> Flink might not see the full DAG. In that sense, I would >>>>>>>>> consider >>>>>>>>>>> the >>>>>>>>>>>>>>>>> cache() method as a hint for the optimizer. Of course, in >> the >>>>>>>>>> future >>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>> might add functionality which tries to automatically cache >>>>>>>>> results >>>>>>>>>>>>>> (e.g. >>>>>>>>>>>>>>>>> caching the latest intermediate results until so and so >> much >>>>>>>>> space >>>>>>>>>>> is >>>>>>>>>>>>>>>>> used). But this should hopefully not contradict with >>>>>>>>> `CachedTable >>>>>>>>>>>>>>>> cache()`. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>> Till >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Tue, Dec 4, 2018 at 2:33 PM Becket Qin < >>>>> becket....@gmail.com >>>>>>>>>> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi Till, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks for the clarification. I am still a little >> confused. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> If cache() returns a CachedTable, the example might >> become: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> b = a.map(...) >>>>>>>>>>>>>>>>>> c = a.map(...) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> cachedTableA = a.cache() >>>>>>>>>>>>>>>>>> d = cachedTableA.map(...) >>>>>>>>>>>>>>>>>> e = a.map() >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> In the above case, if cache() is lazily evaluated, b, c, d >>>>> and >>>>>>>>> e >>>>>>>>>>> are >>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>> going to be reading from the original DAG that generates >> a. >>>>> But >>>>>>>>>>> with >>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>> naive expectation, d should be reading from the cache. >> This >>>>>>>>> seems >>>>>>>>>>> not >>>>>>>>>>>>>>>>>> solving the potential confusion you raised, right? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Just to be clear, my understanding are all based on the >>>>>>>>>> assumption >>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> tables are immutable. Therefore, after a.cache(), a the >>>>>>>>>>>>>> c*achedTableA* >>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>> original table *a * should be completely interchangeable. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> That said, I think a valid argument is optimization. There >>>>> are >>>>>>>>>>> indeed >>>>>>>>>>>>>>>>> cases >>>>>>>>>>>>>>>>>> that reading from the original DAG could be faster than >>>>> reading >>>>>>>>>>> from >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> cache. For example, in the following example: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> a.filter(f1' > 100) >>>>>>>>>>>>>>>>>> a.cache() >>>>>>>>>>>>>>>>>> b = a.filter(f1' < 100) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Ideally the optimizer should be intelligent enough to >> decide >>>>>>>>>> which >>>>>>>>>>>>>> way >>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>> faster, without user intervention. In this case, it will >>>>>>>>> identify >>>>>>>>>>>>>> that >>>>>>>>>>>>>>>> b >>>>>>>>>>>>>>>>>> would just be an empty table, thus skip reading from the >>>>> cache >>>>>>>>>>>>>>>>> completely. >>>>>>>>>>>>>>>>>> But I agree that returning a CachedTable would give user >> the >>>>>>>>>>> control >>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>> when to use cache, even though I still feel that letting >> the >>>>>>>>>>>>>> optimizer >>>>>>>>>>>>>>>>>> handle this is a better option in long run. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Tue, Dec 4, 2018 at 6:51 PM Till Rohrmann < >>>>>>>>>> trohrm...@apache.org >>>>>>>>>>>> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Yes you are right Becket that it still depends on the >>>>> actual >>>>>>>>>>>>>>>> execution >>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>> the job whether a consumer reads from a cached result or >>>>> not. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> My point was actually about the properties of a (cached >> vs. >>>>>>>>>>>>>>>> non-cached) >>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>> not about the execution. I would not make cache trigger >> the >>>>>>>>>>>>>> execution >>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>> the job because one loses some flexibility by eagerly >>>>>>>>> triggering >>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> execution. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I tried to argue for an explicit CachedTable which is >>>>> returned >>>>>>>>>> by >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> cache() method like Piotr did in order to make the API >> more >>>>>>>>>>>>>> explicit. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>> Till >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 4:23 PM Becket Qin < >>>>>>>>> becket....@gmail.com >>>>>>>>>>> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi Till, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> That is a good example. Just a minor correction, in this >>>>>>>>> case, >>>>>>>>>>> b, c >>>>>>>>>>>>>>>>>> and d >>>>>>>>>>>>>>>>>>>> will all consume from a non-cached a. This is because >>>>> cache >>>>>>>>>> will >>>>>>>>>>>>>>>> only >>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>> created on the very first job submission that generates >>>>> the >>>>>>>>>> table >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>> cached. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> If I understand correctly, this is example is about >>>>> whether >>>>>>>>>>>>>>>> .cache() >>>>>>>>>>>>>>>>>>> method >>>>>>>>>>>>>>>>>>>> should be eagerly evaluated or lazily evaluated. In >>>>> another >>>>>>>>>> word, >>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>> cache() method actually triggers a job that creates the >>>>>>>>> cache, >>>>>>>>>>>>>>>> there >>>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>> be no such confusion. Is that right? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> In the example, although d will not consume from the >>>>> cached >>>>>>>>>> Table >>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>> looks supposed to, from correctness perspective the code >>>>> will >>>>>>>>>>> still >>>>>>>>>>>>>>>>>>> return >>>>>>>>>>>>>>>>>>>> correct result, assuming that tables are immutable. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Personally I feel it is OK because users probably won't >>>>>>>>> really >>>>>>>>>>>>>>>> worry >>>>>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>>>> whether the table is cached or not. And lazy cache could >>>>>>>>> avoid >>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>> unnecessary caching if a cached table is never created >> in >>>>> the >>>>>>>>>>> user >>>>>>>>>>>>>>>>>>>> application. But I am not opposed to do eager evaluation >>>>> of >>>>>>>>>>> cache. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 10:01 PM Till Rohrmann < >>>>>>>>>>>>>>>> trohrm...@apache.org> >>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Another argument for Piotr's point is that lazily >>>>> changing >>>>>>>>>>>>>>>>> properties >>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>> node affects all down stream consumers but does not >>>>>>>>>> necessarily >>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>> happen before these consumers are defined. From a >> user's >>>>>>>>>>>>>>>>> perspective >>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>> can be quite confusing: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> b = a.map(...) >>>>>>>>>>>>>>>>>>>>> c = a.map(...) >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> a.cache() >>>>>>>>>>>>>>>>>>>>> d = a.map(...) >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> now b, c and d will consume from a cached operator. In >>>>> this >>>>>>>>>>> case, >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> user >>>>>>>>>>>>>>>>>>>>> would most likely expect that only d reads from a >> cached >>>>>>>>>> result. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>> Till >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 11:32 AM Piotr Nowojski < >>>>>>>>>>>>>>>>>>> pi...@data-artisans.com> >>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Hey Shaoxuan and Becket, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Can you explain a bit more one what are the side >>>>> effects? >>>>>>>>> So >>>>>>>>>>>>>>>>> far >>>>>>>>>>>>>>>>>> my >>>>>>>>>>>>>>>>>>>>>>> understanding is that such side effects only exist >> if a >>>>>>>>>> table >>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>> mutable. >>>>>>>>>>>>>>>>>>>>>>> Is that the case? >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Not only that. There are also performance implications >>>>> and >>>>>>>>>>>>>>>> those >>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>> another implicit side effects of using `void cache()`. >>>>> As I >>>>>>>>>>>>>>>> wrote >>>>>>>>>>>>>>>>>>>> before, >>>>>>>>>>>>>>>>>>>>>> reading from cache might not always be desirable, thus >>>>> it >>>>>>>>> can >>>>>>>>>>>>>>>>> cause >>>>>>>>>>>>>>>>>>>>>> performance degradation and I’m fine with that - >> user's >>>>> or >>>>>>>>>>>>>>>>>>> optimiser’s >>>>>>>>>>>>>>>>>>>>>> choice. What I do not like is that this implicit side >>>>>>>>> effect >>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>> manifest >>>>>>>>>>>>>>>>>>>>>> in completely different part of code, that wasn’t >>>>> touched >>>>>>>>> by >>>>>>>>>> a >>>>>>>>>>>>>>>>> user >>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>> he was adding `void cache()` call somewhere else. And >>>>> even >>>>>>>>> if >>>>>>>>>>>>>>>>>> caching >>>>>>>>>>>>>>>>>>>>>> improves performance, it’s still a side effect of >> `void >>>>>>>>>>>>>>>> cache()`. >>>>>>>>>>>>>>>>>>>> Almost >>>>>>>>>>>>>>>>>>>>>> from the definition `void` methods have only side >>>>> effects. >>>>>>>>>> As I >>>>>>>>>>>>>>>>>> wrote >>>>>>>>>>>>>>>>>>>>>> before, there are couple of scenarios where this might >>>>> be >>>>>>>>>>>>>>>>>> undesirable >>>>>>>>>>>>>>>>>>>>>> and/or unexpected, for example: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 1. >>>>>>>>>>>>>>>>>>>>>> Table b = …; >>>>>>>>>>>>>>>>>>>>>> b.cache() >>>>>>>>>>>>>>>>>>>>>> x = b.join(…) >>>>>>>>>>>>>>>>>>>>>> y = b.count() >>>>>>>>>>>>>>>>>>>>>> // ... >>>>>>>>>>>>>>>>>>>>>> // 100 >>>>>>>>>>>>>>>>>>>>>> // hundred >>>>>>>>>>>>>>>>>>>>>> // lines >>>>>>>>>>>>>>>>>>>>>> // of >>>>>>>>>>>>>>>>>>>>>> // code >>>>>>>>>>>>>>>>>>>>>> // later >>>>>>>>>>>>>>>>>>>>>> z = b.filter(…).groupBy(…) // this might be even >> hidden >>>>> in >>>>>>>>> a >>>>>>>>>>>>>>>>>>> different >>>>>>>>>>>>>>>>>>>>>> method/file/package/dependency >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 2. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Table b = ... >>>>>>>>>>>>>>>>>>>>>> If (some_condition) { >>>>>>>>>>>>>>>>>>>>>> foo(b) >>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>> Else { >>>>>>>>>>>>>>>>>>>>>> bar(b) >>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>> z = b.filter(…).groupBy(…) >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Void foo(Table b) { >>>>>>>>>>>>>>>>>>>>>> b.cache() >>>>>>>>>>>>>>>>>>>>>> // do something with b >>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> In both above examples, `b.cache()` will implicitly >>>>> affect >>>>>>>>>>>>>>>>>> (semantic >>>>>>>>>>>>>>>>>>>> of a >>>>>>>>>>>>>>>>>>>>>> program in case of sources being mutable and >>>>> performance) >>>>>>>>> `z >>>>>>>>>> = >>>>>>>>>>>>>>>>>>>>>> b.filter(…).groupBy(…)` which might be far from >> obvious. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On top of that, there is still this argument of mine >>>>> that >>>>>>>>>>>>>>>> having >>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>> `MaterializedTable` or `CachedTable` handle is more >>>>>>>>> flexible >>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>> us >>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> future and for the user (as a manual option to bypass >>>>> cache >>>>>>>>>>>>>>>>> reads). >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> But Jiangjie is correct, >>>>>>>>>>>>>>>>>>>>>>> the source table in batching should be immutable. It >> is >>>>>>>>> the >>>>>>>>>>>>>>>>>> user’s >>>>>>>>>>>>>>>>>>>>>>> responsibility to ensure it, otherwise even a regular >>>>>>>>>>>>>>>> failover >>>>>>>>>>>>>>>>>> may >>>>>>>>>>>>>>>>>>>> lead >>>>>>>>>>>>>>>>>>>>>>> to inconsistent results. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Yes, I agree that’s what perfect world/good deployment >>>>>>>>> should >>>>>>>>>>>>>>>> be. >>>>>>>>>>>>>>>>>> But >>>>>>>>>>>>>>>>>>>> its >>>>>>>>>>>>>>>>>>>>>> often isn’t and while I’m not trying to fix this >> (since >>>>> the >>>>>>>>>>>>>>>>> proper >>>>>>>>>>>>>>>>>>> fix >>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>> to support transactions), I’m just trying to minimise >>>>>>>>>> confusion >>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> users that are not fully aware what’s going on and >>>>> operate >>>>>>>>> in >>>>>>>>>>>>>>>>> less >>>>>>>>>>>>>>>>>>> then >>>>>>>>>>>>>>>>>>>>>> perfect setup. And if something bites them after >> adding >>>>>>>>>>>>>>>>> `b.cache()` >>>>>>>>>>>>>>>>>>>> call, >>>>>>>>>>>>>>>>>>>>>> to make sure that they at least know all of the places >>>>> that >>>>>>>>>>>>>>>>> adding >>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>> line can affect. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks, Piotrek >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On 1 Dec 2018, at 15:39, Becket Qin < >>>>> becket....@gmail.com >>>>>>>>>> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Hi Piotrek, >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Thanks again for the clarification. Some more replies >>>>> are >>>>>>>>>>>>>>>>>>> following. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> But keep in mind that `.cache()` will/might not only >> be >>>>>>>>> used >>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>> interactive >>>>>>>>>>>>>>>>>>>>>>>> programming and not only in batching. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> It is true. Actually in stream processing, cache() >> has >>>>> the >>>>>>>>>>>>>>>> same >>>>>>>>>>>>>>>>>>>>> semantic >>>>>>>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>>>> batch processing. The semantic is following: >>>>>>>>>>>>>>>>>>>>>>> For a table created via a series of computation, save >>>>> that >>>>>>>>>>>>>>>>> table >>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>> later >>>>>>>>>>>>>>>>>>>>>>> reference to avoid running the computation logic to >>>>>>>>>>>>>>>> regenerate >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> table. >>>>>>>>>>>>>>>>>>>>>>> Once the application exits, drop all the cache. >>>>>>>>>>>>>>>>>>>>>>> This semantic is same for both batch and stream >>>>>>>>> processing. >>>>>>>>>>>>>>>> The >>>>>>>>>>>>>>>>>>>>>> difference >>>>>>>>>>>>>>>>>>>>>>> is that stream applications will only run once as >> they >>>>> are >>>>>>>>>>>>>>>> long >>>>>>>>>>>>>>>>>>>>> running. >>>>>>>>>>>>>>>>>>>>>>> And the batch applications may be run multiple times, >>>>>>>>> hence >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> cache >>>>>>>>>>>>>>>>>>>>> may >>>>>>>>>>>>>>>>>>>>>>> be created and dropped each time the application >> runs. >>>>>>>>>>>>>>>>>>>>>>> Admittedly, there will probably be some resource >>>>>>>>> management >>>>>>>>>>>>>>>>>>>>> requirements >>>>>>>>>>>>>>>>>>>>>>> for the streaming cached table, such as time based / >>>>> size >>>>>>>>>>>>>>>> based >>>>>>>>>>>>>>>>>>>>>> retention, >>>>>>>>>>>>>>>>>>>>>>> to address the infinite data issue. But such >>>>> requirement >>>>>>>>>> does >>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>> change >>>>>>>>>>>>>>>>>>>>>>> the semantic. >>>>>>>>>>>>>>>>>>>>>>> You are right that interactive programming is just >> one >>>>> use >>>>>>>>>>>>>>>> case >>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>> cache(). >>>>>>>>>>>>>>>>>>>>>>> It is not the only use case. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> For me the more important issue is of not having the >>>>> `void >>>>>>>>>>>>>>>>>> cache()` >>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>> side effects. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> This is indeed the key point. The argument around >>>>> whether >>>>>>>>>>>>>>>>> cache() >>>>>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>>>> return something already indicates that cache() and >>>>>>>>>>>>>>>>> materialize() >>>>>>>>>>>>>>>>>>>>> address >>>>>>>>>>>>>>>>>>>>>>> different issues. >>>>>>>>>>>>>>>>>>>>>>> Can you explain a bit more one what are the side >>>>> effects? >>>>>>>>> So >>>>>>>>>>>>>>>>> far >>>>>>>>>>>>>>>>>> my >>>>>>>>>>>>>>>>>>>>>>> understanding is that such side effects only exist >> if a >>>>>>>>>> table >>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>> mutable. >>>>>>>>>>>>>>>>>>>>>>> Is that the case? >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I don’t know, probably initially we should make >>>>>>>>> CachedTable >>>>>>>>>>>>>>>>>>>> read-only. >>>>>>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>>>>>> don’t find it more confusing than the fact that user >>>>> can >>>>>>>>>> not >>>>>>>>>>>>>>>>>> write >>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>> views >>>>>>>>>>>>>>>>>>>>>>>> or materialised views in SQL or that user currently >>>>> can >>>>>>>>> not >>>>>>>>>>>>>>>>>> write >>>>>>>>>>>>>>>>>>>> to a >>>>>>>>>>>>>>>>>>>>>>>> Table. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I don't think anyone should insert something to a >>>>> cache. >>>>>>>>> By >>>>>>>>>>>>>>>>>>>> definition >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> cache should only be updated when the corresponding >>>>>>>>> original >>>>>>>>>>>>>>>>>> table >>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>> updated. What I am wondering is that given the >>>>> following >>>>>>>>> two >>>>>>>>>>>>>>>>>> facts: >>>>>>>>>>>>>>>>>>>>>>> 1. If and only if a table is mutable (with something >>>>> like >>>>>>>>>>>>>>>>>>> insert()), >>>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>> CachedTable may have implicit behavior. >>>>>>>>>>>>>>>>>>>>>>> 2. A CachedTable extends a Table. >>>>>>>>>>>>>>>>>>>>>>> We can come to the conclusion that a CachedTable is >>>>>>>>> mutable >>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>> users >>>>>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>>> insert into the CachedTable directly. This is where I >>>>>>>>>> thought >>>>>>>>>>>>>>>>>>>>> confusing. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Sat, Dec 1, 2018 at 2:45 AM Piotr Nowojski < >>>>>>>>>>>>>>>>>>>> pi...@data-artisans.com >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Regarding naming `cache()` vs `materialize()`. One >>>>> more >>>>>>>>>>>>>>>>>>> explanation >>>>>>>>>>>>>>>>>>>>> why >>>>>>>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>>>>>> think `materialize()` is more natural to me is that >> I >>>>>>>>> think >>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>>>>>> “Table”s >>>>>>>>>>>>>>>>>>>>>>>> in Table-API as views. They behave the same way as >> SQL >>>>>>>>>>>>>>>> views, >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> only >>>>>>>>>>>>>>>>>>>>>>>> difference for me is that their live scope is short >> - >>>>>>>>>>>>>>>> current >>>>>>>>>>>>>>>>>>>> session >>>>>>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>>>>> is limited by different execution model. That’s why >>>>>>>>>>>>>>>> “cashing” >>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>> view >>>>>>>>>>>>>>>>>>>>>> for me >>>>>>>>>>>>>>>>>>>>>>>> is just materialising it. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> However I see and I understand your point of view. >>>>> Coming >>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>>>>> DataSet/DataStream and generally speaking non-SQL >>>>> world, >>>>>>>>>>>>>>>>>> `cache()` >>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>> more >>>>>>>>>>>>>>>>>>>>>>>> natural. But keep in mind that `.cache()` will/might >>>>> not >>>>>>>>>>>>>>>> only >>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>> used >>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>> interactive programming and not only in batching. >> But >>>>>>>>>> naming >>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>> one >>>>>>>>>>>>>>>>>>>>>> issue, >>>>>>>>>>>>>>>>>>>>>>>> and not that critical to me. Especially that once we >>>>>>>>>>>>>>>> implement >>>>>>>>>>>>>>>>>>>> proper >>>>>>>>>>>>>>>>>>>>>>>> materialised views, we can always deprecate/rename >>>>>>>>>> `cache()` >>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>> deem >>>>>>>>>>>>>>>>>>>>>> so. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> For me the more important issue is of not having the >>>>>>>>> `void >>>>>>>>>>>>>>>>>>> cache()` >>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>> side effects. Exactly for the reasons that you have >>>>>>>>>>>>>>>> mentioned. >>>>>>>>>>>>>>>>>>> True: >>>>>>>>>>>>>>>>>>>>>>>> results might be non deterministic if underlying >>>>> source >>>>>>>>>>>>>>>> table >>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>> changing. >>>>>>>>>>>>>>>>>>>>>>>> Problem is that `void cache()` implicitly changes >> the >>>>>>>>>>>>>>>> semantic >>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>> subsequent uses of the cached/materialized Table. It >>>>> can >>>>>>>>>>>>>>>> cause >>>>>>>>>>>>>>>>>>> “wtf” >>>>>>>>>>>>>>>>>>>>>> moment >>>>>>>>>>>>>>>>>>>>>>>> for a user if he inserts “b.cache()” call in some >>>>> place >>>>>>>>> in >>>>>>>>>>>>>>>> his >>>>>>>>>>>>>>>>>>> code >>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>> suddenly some other random places are behaving >>>>>>>>> differently. >>>>>>>>>>>>>>>> If >>>>>>>>>>>>>>>>>>>>>>>> `materialize()` or `cache()` returns a Table handle, >>>>> we >>>>>>>>>>>>>>>> force >>>>>>>>>>>>>>>>>> user >>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>> explicitly use the cache which removes the “random” >>>>> part >>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> "suddenly >>>>>>>>>>>>>>>>>>>>>>>> some other random places are behaving differently”. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> This argument and others that I’ve raised (greater >>>>>>>>>>>>>>>>>>>>> flexibility/allowing >>>>>>>>>>>>>>>>>>>>>>>> user to explicitly bypass the cache) are independent >>>>> of >>>>>>>>>>>>>>>>>> `cache()` >>>>>>>>>>>>>>>>>>> vs >>>>>>>>>>>>>>>>>>>>>>>> `materialize()` discussion. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Does that mean one can also insert into the >>>>> CachedTable? >>>>>>>>>>>>>>>> This >>>>>>>>>>>>>>>>>>>> sounds >>>>>>>>>>>>>>>>>>>>>>>> pretty confusing. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I don’t know, probably initially we should make >>>>>>>>> CachedTable >>>>>>>>>>>>>>>>>>>>> read-only. I >>>>>>>>>>>>>>>>>>>>>>>> don’t find it more confusing than the fact that user >>>>> can >>>>>>>>>> not >>>>>>>>>>>>>>>>>> write >>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>> views >>>>>>>>>>>>>>>>>>>>>>>> or materialised views in SQL or that user currently >>>>> can >>>>>>>>> not >>>>>>>>>>>>>>>>>> write >>>>>>>>>>>>>>>>>>>> to a >>>>>>>>>>>>>>>>>>>>>>>> Table. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Piotrek >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> On 30 Nov 2018, at 17:38, Xingcan Cui < >>>>>>>>> xingc...@gmail.com >>>>>>>>>>> >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I agree with @Becket that `cache()` and >>>>> `materialize()` >>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>> considered as two different methods where the later >>>>> one >>>>>>>>> is >>>>>>>>>>>>>>>>> more >>>>>>>>>>>>>>>>>>>>>>>> sophisticated. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> According to my understanding, the initial idea is >>>>> just >>>>>>>>> to >>>>>>>>>>>>>>>>>>>> introduce >>>>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>>> simple cache or persist mechanism, but as the >> TableAPI >>>>>>>>> is a >>>>>>>>>>>>>>>>>>>> high-level >>>>>>>>>>>>>>>>>>>>>> API, >>>>>>>>>>>>>>>>>>>>>>>> it’s naturally for as to think in a SQL way. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Maybe we can add the `cache()` method to the >> DataSet >>>>> API >>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>> force >>>>>>>>>>>>>>>>>>>>>> users >>>>>>>>>>>>>>>>>>>>>>>> to translate a Table to a Dataset before caching it. >>>>> Then >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> users >>>>>>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>>>>> manually register the cached dataset to a table >> again >>>>> (we >>>>>>>>>>>>>>>> may >>>>>>>>>>>>>>>>>> need >>>>>>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>>>>>> table replacement mechanisms for datasets with an >>>>>>>>> identical >>>>>>>>>>>>>>>>>> schema >>>>>>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>>>>>>>>> different contents here). After all, it’s the >> dataset >>>>>>>>>> rather >>>>>>>>>>>>>>>>>> than >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> dynamic table that need to be cached, right? >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>>> Xingcan >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> On Nov 30, 2018, at 10:57 AM, Becket Qin < >>>>>>>>>>>>>>>>>> becket....@gmail.com> >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Piotrek and Jark, >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback and explanation. Those are >>>>> good >>>>>>>>>>>>>>>>>>> arguments. >>>>>>>>>>>>>>>>>>>>>> But I >>>>>>>>>>>>>>>>>>>>>>>>>> think those arguments are mostly about >> materialized >>>>>>>>> view. >>>>>>>>>>>>>>>>> Let >>>>>>>>>>>>>>>>>> me >>>>>>>>>>>>>>>>>>>> try >>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>> explain the reason I believe cache() and >>>>> materialize() >>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>> different. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> I think cache() and materialize() have quite >>>>> different >>>>>>>>>>>>>>>>>>>> implications. >>>>>>>>>>>>>>>>>>>>>> An >>>>>>>>>>>>>>>>>>>>>>>>>> analogy I can think of is save()/publish(). When >>>>> users >>>>>>>>>>>>>>>> call >>>>>>>>>>>>>>>>>>>> cache(), >>>>>>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>>> just like they are saving an intermediate result >> as >>>>> a >>>>>>>>>>>>>>>> draft >>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>> their >>>>>>>>>>>>>>>>>>>>>>>> work, >>>>>>>>>>>>>>>>>>>>>>>>>> this intermediate result may not have any >> realistic >>>>>>>>>>>>>>>> meaning. >>>>>>>>>>>>>>>>>>>> Calling >>>>>>>>>>>>>>>>>>>>>>>>>> cache() does not mean users want to publish the >>>>> cached >>>>>>>>>>>>>>>> table >>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>> any >>>>>>>>>>>>>>>>>>>>>>>> manner. >>>>>>>>>>>>>>>>>>>>>>>>>> But when users call materialize(), that means "I >>>>> have >>>>>>>>>>>>>>>>>> something >>>>>>>>>>>>>>>>>>>>>>>> meaningful >>>>>>>>>>>>>>>>>>>>>>>>>> to be reused by others", now users need to think >>>>> about >>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> validation, >>>>>>>>>>>>>>>>>>>>>>>>>> update & versioning, lifecycle of the result, etc. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Piotrek's suggestions on variations of the >>>>>>>>> materialize() >>>>>>>>>>>>>>>>>> methods >>>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>>> very >>>>>>>>>>>>>>>>>>>>>>>>>> useful. It would be great if Flink have them. The >>>>>>>>> concept >>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>> materialized >>>>>>>>>>>>>>>>>>>>>>>>>> view is actually a pretty big feature, not to say >>>>> the >>>>>>>>>>>>>>>>> related >>>>>>>>>>>>>>>>>>>> stuff >>>>>>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>>>>>>>> triggers/hooks you mentioned earlier. I think the >>>>>>>>>>>>>>>>> materialized >>>>>>>>>>>>>>>>>>>> view >>>>>>>>>>>>>>>>>>>>>>>> itself >>>>>>>>>>>>>>>>>>>>>>>>>> should be discussed in a more thorough and >>>>> systematic >>>>>>>>>>>>>>>>> manner. >>>>>>>>>>>>>>>>>>> And >>>>>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>>>>>> found >>>>>>>>>>>>>>>>>>>>>>>>>> that discussion is kind of orthogonal and way >> beyond >>>>>>>>>>>>>>>>>> interactive >>>>>>>>>>>>>>>>>>>>>>>>>> programming experience. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> The example you gave was interesting. I still have >>>>> some >>>>>>>>>>>>>>>>>>> questions, >>>>>>>>>>>>>>>>>>>>>>>> though. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Table source = … // some source that scans files >>>>> from a >>>>>>>>>>>>>>>>>>> directory >>>>>>>>>>>>>>>>>>>>>>>>>>> “/foo/bar/“ >>>>>>>>>>>>>>>>>>>>>>>>>>> Table t1 = source.groupBy(…).select(…).where(…) >> ….; >>>>>>>>>>>>>>>>>>>>>>>>>>> Table t2 = t1.materialize() // (or `cache()`) >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> t2.count() // initialise cache (if it’s lazily >>>>>>>>>>>>>>>> initialised) >>>>>>>>>>>>>>>>>>>>>>>>>>> int a1 = t1.count() >>>>>>>>>>>>>>>>>>>>>>>>>>> int b1 = t2.count() >>>>>>>>>>>>>>>>>>>>>>>>>>> // something in the background (or we trigger it) >>>>>>>>> writes >>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>> files >>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>> /foo/bar >>>>>>>>>>>>>>>>>>>>>>>>>>> int a2 = t1.count() >>>>>>>>>>>>>>>>>>>>>>>>>>> int b2 = t2.count() >>>>>>>>>>>>>>>>>>>>>>>>>>> t2.refresh() // possible future extension, not to >>>>> be >>>>>>>>>>>>>>>>>>> implemented >>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>> initial version >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> what if someone else added some more files to >>>>> /foo/bar >>>>>>>>> at >>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>> point? >>>>>>>>>>>>>>>>>>>>>> In >>>>>>>>>>>>>>>>>>>>>>>>>> that case, a3 won't equals to b3, and the result >>>>> become >>>>>>>>>>>>>>>>>>>>>>>> non-deterministic, >>>>>>>>>>>>>>>>>>>>>>>>>> right? >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> int a3 = t1.count() >>>>>>>>>>>>>>>>>>>>>>>>>>> int b3 = t2.count() >>>>>>>>>>>>>>>>>>>>>>>>>>> t2.drop() // another possible future extension, >>>>> manual >>>>>>>>>>>>>>>>>> “cache” >>>>>>>>>>>>>>>>>>>>>> dropping >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> When we talk about interactive programming, in >> most >>>>>>>>>> cases, >>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>>> talking >>>>>>>>>>>>>>>>>>>>>>>>>> about batch applications. A fundamental assumption >>>>> of >>>>>>>>>> such >>>>>>>>>>>>>>>>>> case >>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>> source data is complete before the data processing >>>>>>>>>> begins, >>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> data >>>>>>>>>>>>>>>>>>>>>>>>>> will not change during the data processing. IMO, >> if >>>>>>>>>>>>>>>>> additional >>>>>>>>>>>>>>>>>>>> rows >>>>>>>>>>>>>>>>>>>>>>>> needs >>>>>>>>>>>>>>>>>>>>>>>>>> to be added to some source during the processing, >> it >>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>> done >>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>> ways >>>>>>>>>>>>>>>>>>>>>>>>>> like union the source with another table >> containing >>>>> the >>>>>>>>>>>>>>>> rows >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>> added. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> There are a few cases that computations are >> executed >>>>>>>>>>>>>>>>>> repeatedly >>>>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>> changing data source. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> For example, people may run a ML training job >> every >>>>>>>>> hour >>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> samples >>>>>>>>>>>>>>>>>>>>>>>>>> newly added in the past hour. In that case, the >>>>> source >>>>>>>>>>>>>>>> data >>>>>>>>>>>>>>>>>>>> between >>>>>>>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>>>>>>>> indeed change. But still, the data remain >> unchanged >>>>>>>>>> within >>>>>>>>>>>>>>>>> one >>>>>>>>>>>>>>>>>>>> run. >>>>>>>>>>>>>>>>>>>>>> And >>>>>>>>>>>>>>>>>>>>>>>>>> usually in that case, the result will need >>>>> versioning, >>>>>>>>>>>>>>>> i.e. >>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>> given >>>>>>>>>>>>>>>>>>>>>>>>>> result, it tells that the result is a result from >>>>> the >>>>>>>>>>>>>>>> source >>>>>>>>>>>>>>>>>>> data >>>>>>>>>>>>>>>>>>>>> by a >>>>>>>>>>>>>>>>>>>>>>>>>> certain timestamp. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Another example is something like data warehouse. >> In >>>>>>>>> this >>>>>>>>>>>>>>>>>> case, >>>>>>>>>>>>>>>>>>>>> there >>>>>>>>>>>>>>>>>>>>>>>> are a >>>>>>>>>>>>>>>>>>>>>>>>>> few source of original/raw data. On top of those >>>>>>>>> sources, >>>>>>>>>>>>>>>>> many >>>>>>>>>>>>>>>>>>>>>>>> materialized >>>>>>>>>>>>>>>>>>>>>>>>>> view / queries / reports / dashboards can be >>>>> created to >>>>>>>>>>>>>>>>>> generate >>>>>>>>>>>>>>>>>>>>>> derived >>>>>>>>>>>>>>>>>>>>>>>>>> data. Those derived data needs to be updated when >>>>> the >>>>>>>>>>>>>>>>>> underlying >>>>>>>>>>>>>>>>>>>>>>>> original >>>>>>>>>>>>>>>>>>>>>>>>>> data changes. In that case, the processing logic >>>>> that >>>>>>>>>>>>>>>>> derives >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> original >>>>>>>>>>>>>>>>>>>>>>>>>> data needs to be executed repeatedly to update >> those >>>>>>>>>>>>>>>>>>>> reports/views. >>>>>>>>>>>>>>>>>>>>>>>> Again, >>>>>>>>>>>>>>>>>>>>>>>>>> all those derived data also need to ha >>>>> >>>>> >> >> >>