Hi, I think that introducing ref counting could be confusing and it will be error prone, since Flink-table’s users are not used to closing/releasing resources. I was more objecting placing the `uncache()`/`dropCache()`/`releaseCache()` (releaseCache sounds best to me) as a method in the “Table”. It might be not obvious that it will drop the cache for all of the usages of the given table. For example:
public void foo(Table t) { // … t.releaseCache(); } public void bar(Table t) { // ... } Table a = … val cachedA = a.cache() foo(cachedA) bar(cachedA) My problem with above example is that `t.releaseCache()` call is not doing the best possible job in communicating to the user that it will have a side effects for other places, like `bar(cachedA)` call. Something like this might be a better (not perfect, but just a bit better): public void foo(Table t, CacheService cacheService) { // … cacheService.releaseCacheFor(t); } Table a = … val cachedA = a.cache() foo(cachedA, env.getCacheService()) bar(cachedA) Also from another perspective, maybe placing `releaseCache()` method in Table might not be the best separation of concerns - `releaseCache()` method seams significantly different compared to other existing methods. Piotrek > On 8 Jan 2019, at 12:28, Becket Qin <becket....@gmail.com> wrote: > > Hi Piotr, > > You are right. There might be two intuitive meanings when users call > 'a.uncache()', namely: > 1. release the resource > 2. Do not use cache for the next operation. > > Case (1) would likely be the dominant use case. So I would suggest we > dedicate uncache() method to case (1), i.e. for resource release, but not > for ignoring cache. > > For case 2, i.e. explicitly ignoring cache (which is rare), users may use > something like 'hint("ignoreCache")'. I think this is better as it is a > little weird for users to call `a.uncache()` while they may not even know > if the table is cached at all. > > Assuming we let `uncache()` to only release resource, one possibility is > using ref count to mitigate the side effect. That means a ref count is > incremented on `cache()` and decremented on `uncache()`. That means > `uncache()` does not physically release the resource immediately, but just > means the cache could be released. > That being said, I am not sure if this is really a better solution as it > seems a little counter intuitive. Maybe calling it releaseCache() help a > little bit? > > Thanks, > > Jiangjie (Becket) Qin > > > > > On Tue, Jan 8, 2019 at 5:36 PM Piotr Nowojski <pi...@da-platform.com> wrote: > >> Hi Becket, >> >> With `uncache` there are probably two features that we can think about: >> >> a) >> >> Physically dropping the cached table from the storage, freeing up the >> resources >> >> b) >> >> Hinting the optimizer to not cache the reads for the next query/table >> >> a) Has the issue as I wrote before, that it seemed to be an operation >> inherently “flawed" with having side effects. >> >> I’m not sure how it would be best to express. We could make it work: >> >> 1. via a method on a Table as you proposed: >> >> void Table#dropCache() >> void Table#uncache() >> >> 2. Operation on the environment >> >> env.dropCacheFor(table) // or some other argument that allows user to >> identify the desired cache >> >> 3. Extending (from your original design doc) `setTableService` method to >> return some control handle like: >> >> TableServiceControl setTableService(TableFactory tf, >> TableProperties properties, >> TempTableCleanUpCallback cleanUpCallback); >> >> (TableServiceControl? TableService? TableServiceHandle? CacheService?) >> >> And having the drop cache method there: >> >> TableServiceControl#dropCache(table) >> >> Out of those options, option 1 might have a disadvantage of kind of not >> making the user aware, that this is a global operation with side effects. >> Like the old example of: >> >> public void foo(Table t) { >> // … >> t.dropCache(); >> } >> >> It might not be immediately obvious that `t.dropCache()` is some kind of >> global operation, with side effects visible outside of the `foo` function. >> >> On the other hand, both option 2 and 3, might have greater chance of >> catching user’s attention: >> >> public void foo(Table t, CacheService cacheService) { >> // … >> cacheService.dropCache(t); >> } >> >> b) could be achieved quite easily: >> >> Table a = … >> val notCached1 = a.doNotCache() >> val cachedA = a.cache() >> val notCached2 = cachedA.doNotCache() // equivalent of notCached1 >> >> `doNotCache()` would behave similarly to `cache()` - return a copy of the >> table with removed “cache” hint and/or added “never cache” hint. >> >> Piotrek >> >> >>> On 8 Jan 2019, at 03:17, Becket Qin <becket....@gmail.com> wrote: >>> >>> Hi Piotr, >>> >>> Thanks for the proposal and detailed explanation. I like the idea of >>> returning a new hinted Table without modifying the original table. This >>> also leave the room for users to benefit from future implicit caching. >>> >>> Just to make sure I get the full picture. In your proposal, there will >> also >>> be a 'void Table#uncache()' method to release the cache, right? >>> >>> Thanks, >>> >>> Jiangjie (Becket) Qin >>> >>> On Mon, Jan 7, 2019 at 11:50 PM Piotr Nowojski <pi...@da-platform.com> >>> wrote: >>> >>>> Hi Becket! >>>> >>>> After further thinking I tend to agree that my previous proposal >> (*Option >>>> 2*) indeed might not be if would in the future introduce automatic >> caching. >>>> However I would like to propose a slightly modified version of it: >>>> >>>> *Option 4* >>>> >>>> Adding `cache()` method with following signature: >>>> >>>> Table Table#cache(); >>>> >>>> Without side-effects, and `cache()` call do not modify/change original >>>> Table in any way. >>>> It would return a copy of original table, with added hint for the >>>> optimizer to cache the table, so that the future accesses to the >> returned >>>> table might be cached or not. >>>> >>>> Assuming that we are talking about a setup, where we do not have >> automatic >>>> caching enabled (possible future extension). >>>> >>>> Example #1: >>>> >>>> ``` >>>> Table a = … >>>> a.foo() // not cached >>>> >>>> val cachedTable = a.cache(); >>>> >>>> cachedA.bar() // maybe cached >>>> a.foo() // same as before - effectively not cached >>>> ``` >>>> >>>> Both the first and the second `a.foo()` operations would behave in the >>>> exactly same way. Again, `a.cache()` call doesn’t affect `a` itself. If >> `a` >>>> was not hinted for caching before `a.cache();`, then both `a.foo()` >> calls >>>> wouldn’t use cache. >>>> >>>> Returned `cachedA` would be hinted with “cache” hint, so probably >>>> `cachedA.bar()` would go through cache (unless optimiser decides the >>>> opposite) >>>> >>>> Example #2 >>>> >>>> ``` >>>> Table a = … >>>> >>>> a.foo() // not cached >>>> >>>> val b = a.cache(); >>>> >>>> a.foo() // same as before - effectively not cached >>>> b.foo() // maybe cached >>>> >>>> val c = b.cache(); >>>> >>>> a.foo() // same as before - effectively not cached >>>> b.foo() // same as before - effectively maybe cached >>>> c.foo() // maybe cached >>>> ``` >>>> >>>> Now, assuming that we have some future “automatic caching optimisation”: >>>> >>>> Example #3 >>>> >>>> ``` >>>> env.enableAutomaticCaching() >>>> Table a = … >>>> >>>> a.foo() // might be cached, depending if `a` was selected to automatic >>>> caching >>>> >>>> val b = a.cache(); >>>> >>>> a.foo() // same as before - might be cached, if `a` was selected to >>>> automatic caching >>>> b.foo() // maybe cached >>>> ``` >>>> >>>> >>>> More or less this is the same behaviour as: >>>> >>>> Table a = ... >>>> val b = a.filter(x > 20) >>>> >>>> calling `filter` hasn’t changed or altered `a` in anyway. If `a` was >>>> previously filtered: >>>> >>>> Table src = … >>>> val a = src.filter(x > 20) >>>> val b = a.filter(x > 20) >>>> >>>> then yes, `a` and `b` will be the same. But the point is that neither >>>> `filter` nor `cache` changes the original `a` table. >>>> >>>> One thing is that indeed, physically dropping cache operation, will have >>>> side effects and it will in a way mutate the cached table references. >> But >>>> this is I think unavoidable in any solution - the same issue as calling >>>> `.close()`, or calling destructor in C++. >>>> >>>> Piotrek >>>> >>>>> On 7 Jan 2019, at 10:41, Becket Qin <becket....@gmail.com> wrote: >>>>> >>>>> Happy New Year, everybody! >>>>> >>>>> I would like to resume this discussion thread. At this point, We have >>>>> agreed on the first step goal of interactive programming. The open >>>>> discussion is the exact API. More specifically, what should *cache()* >>>>> method return and what is the semantic. There are three options: >>>>> >>>>> *Option 1* >>>>> *void cache()* OR *Table cache()* which returns the original table for >>>>> chained calls. >>>>> *void uncache() *releases the cache. >>>>> *Table.hint(ignoreCache).foo()* to ignore cache for operation foo(). >>>>> >>>>> - Semantic: a.cache() hints that table 'a' should be cached. Optimizer >>>>> decides whether the cache will be used or not. >>>>> - pros: simple and no confusion between CachedTable and original table >>>>> - cons: A table may be cached / uncached in a method invocation, while >>>> the >>>>> caller does not know about this. >>>>> >>>>> *Option 2* >>>>> *CachedTable cache()* >>>>> *CachedTable *extends *Table *with an additional *uncache()* method >>>>> >>>>> - Semantic: After *val cachedA = a.cache()*, *cachedA.foo()* will >> always >>>>> use cache. *a.bar() *will always use original DAG. >>>>> - pros: No potential side effects in method invocation. >>>>> - cons: Optimizer has no chance to kick in. Future optimization will >>>> become >>>>> a behavior change and need users to change the code. >>>>> >>>>> *Option 3* >>>>> *CacheHandle cache()* >>>>> *CacheHandle.release() *to release a cache handle on the table. If all >>>>> cache handles are released, the cache could be removed. >>>>> *Table.hint(ignoreCache).foo()* to ignore cache for operation foo(). >>>>> >>>>> - Semantic: *a.cache() *hints that 'a' should be cached. Optimizer >>>> decides >>>>> whether the cache will be used or not. Cache is released either no >> handle >>>>> is on it, or the user program exits. >>>>> - pros: No potential side effect in method invocation. No confusion >>>> between >>>>> cached table v.s original table. >>>>> - cons: An additional CacheHandle exposed to the users. >>>>> >>>>> >>>>> Personally I prefer option 3 for the following reasons: >>>>> 1. It is simple. Vast majority of the users would just call >>>>> *a.cache()* followed >>>>> by *a.foo(),* *a.bar(), etc. * >>>>> 2. There is no semantic ambiguity and semantic change if we decide to >> add >>>>> implicit cache in the future. >>>>> 3. There is no side effect in the method calls. >>>>> 4. Admittedly we need to expose one more CacheHandle class to the >> users. >>>>> But it is not that difficult to understand given similar well known >>>> concept >>>>> like ref count (we can name it CacheReference if that is easier to >>>>> understand). So I think it is fine. >>>>> >>>>> >>>>> Thanks, >>>>> >>>>> Jiangjie (Becket) Qin >>>>> >>>>> >>>>> On Thu, Dec 13, 2018 at 11:23 AM Becket Qin <becket....@gmail.com> >>>> wrote: >>>>> >>>>>> Hi Piotrek, >>>>>> >>>>>> 1. Regarding optimization. >>>>>> Sure there are many cases that the decision is hard to make. But that >>>> does >>>>>> not make it any easier for the users to make those decisions. I >> imagine >>>> 99% >>>>>> of the users would just naively use cache. I am not saying we can >>>> optimize >>>>>> in all the cases. But as long as we agree that at least in certain >>>> cases (I >>>>>> would argue most cases), optimizer can do a little better than an >>>> average >>>>>> user who likely knows little about Flink internals, we should not push >>>> the >>>>>> burden of optimization to users. >>>>>> >>>>>> BTW, it seems some of your concerns are related to the >> implementation. I >>>>>> did not mention the implementation of the caching service because that >>>>>> should not affect the API semantic. Not sure if this helps, but >> imagine >>>> the >>>>>> default implementation has one StorageNode service colocating with >> each >>>> TM. >>>>>> It could be running within the TM process or in a standalone process, >>>>>> depending on configuration. >>>>>> >>>>>> The StorageNode uses memory + spill-to-disk mechanism. The cached data >>>>>> will just be written to the local StorageNode service. If the >>>> StorageNode >>>>>> is running within the TM process, the in-memory cache could just be >>>> objects >>>>>> so we save some serde cost. A later job referring to the cached Table >>>> will >>>>>> be scheduled in a locality aware manner, i.e. run in the TM whose peer >>>>>> StorageNode hosts the data. >>>>>> >>>>>> >>>>>> 2. Semantic >>>>>> I am not sure why introducing a new hintCache() or >>>>>> env.enableAutomaticCaching() method would avoid the consequence of >>>> semantic >>>>>> change. >>>>>> >>>>>> If the auto optimization is not enabled by default, users still need >> to >>>>>> make code change to all existing programs in order to get the benefit. >>>>>> If the auto optimization is enabled by default, advanced users who >> know >>>>>> that they really want to use cache will suddenly lose the opportunity >>>> to do >>>>>> so, unless they change the code to disable auto optimization. >>>>>> >>>>>> >>>>>> 3. side effect >>>>>> The CacheHandle is not only for where to put uncache(). It is to solve >>>> the >>>>>> implicit performance impact by moving the uncache() to the >> CacheHandle. >>>>>> >>>>>> - If users wants to leverage cache, they can call a.cache(). After >>>>>> that, unless user explicitly release that CacheHandle, a.foo() will >>>> always >>>>>> leverage cache if needed (optimizer may choose to ignore cache if >> that >>>>>> helps accelerate the process). Any function call will not be able to >>>>>> release the cache because they do not have that CacheHandle. >>>>>> - If some advanced users do not want to use cache at all, they will >>>>>> call a.hint(ignoreCache).foo(). This will for sure ignore cache and >>>> use the >>>>>> original DAG to process. >>>>>> >>>>>> >>>>>>> In vast majority of the cases, users wouldn't really care whether the >>>>>>> cache is used or not. >>>>>>> I wouldn’t agree with that, because “caching” (if not purely in >> memory >>>>>>> caching) would add additional IO costs. It’s similar as saying that >>>> users >>>>>>> would not see a difference between Spark/Flink and MapReduce >> (MapReduce >>>>>>> writes data to disks after every map/reduce stage). >>>>>> >>>>>> What I wanted to say is that in most cases, after users call cache(), >>>> they >>>>>> don't really care about whether auto optimization has decided to >> ignore >>>> the >>>>>> cache or not, as long as the program runs faster. >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Jiangjie (Becket) Qin >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Dec 12, 2018 at 10:50 PM Piotr Nowojski < >>>> pi...@data-artisans.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Thanks for the quick answer :) >>>>>>> >>>>>>> Re 1. >>>>>>> >>>>>>> I generally agree with you, however couple of points: >>>>>>> >>>>>>> a) the problem with using automatic caching is bigger, because you >> will >>>>>>> have to decide, how do you compare IO vs CPU costs and if you pick >>>> wrong, >>>>>>> additional IO costs might be enormous or even can crash your system. >>>> This >>>>>>> is more difficult problem compared to let say join reordering, where >>>> the >>>>>>> only issue is to have good statistics that can capture correlations >>>> between >>>>>>> columns (when you reorder joins number of IO operations do not >> change) >>>>>>> c) your example is completely independent of caching. >>>>>>> >>>>>>> Query like this: >>>>>>> >>>>>>> src1.filte('f1 > 10).join(src2.filter('f2 < 30), `f1 ===`f2).as('f3, >>>>>>> …).filter(‘f3 > 30) >>>>>>> >>>>>>> Should/could be optimised to empty result immediately, without the >> need >>>>>>> for any cache/materialisation and that should work even without any >>>>>>> statistics provided by the connector. >>>>>>> >>>>>>> For me prerequisite to any serious cost-based optimisations would be >>>> some >>>>>>> reasonable benchmark coverage of the code (tpch?). Otherwise that >>>> would be >>>>>>> equivalent of adding not tested code, since we wouldn’t be able to >>>> verify >>>>>>> our assumptions, like how does the writing of 10 000 records to >>>>>>> cache/RocksDB/Kafka/CSV file compare to joining/filtering/processing >> of >>>>>>> lets say 1000 000 rows. >>>>>>> >>>>>>> Re 2. >>>>>>> >>>>>>> I wasn’t proposing to change the semantic later. I was proposing that >>>> we >>>>>>> start now: >>>>>>> >>>>>>> CachedTable cachedA = a.cache() >>>>>>> cachedA.foo() // Cache is used >>>>>>> a.bar() // Original DAG is used >>>>>>> >>>>>>> And then later we can think about adding for example >>>>>>> >>>>>>> CachedTable cachedA = a.hintCache() >>>>>>> cachedA.foo() // Cache might be used >>>>>>> a.bar() // Original DAG is used >>>>>>> >>>>>>> Or >>>>>>> >>>>>>> env.enableAutomaticCaching() >>>>>>> a.foo() // Cache might be used >>>>>>> a.bar() // Cache might be used >>>>>>> >>>>>>> Or (I would still not like this option): >>>>>>> >>>>>>> a.hintCache() >>>>>>> a.foo() // Cache might be used >>>>>>> a.bar() // Cache might be used >>>>>>> >>>>>>> Or whatever else that will come to our mind. Even if we add some >>>>>>> automatic caching in the future, keeping implicit (`CachedTable >>>> cache()`) >>>>>>> caching will still be useful, at least in some cases. >>>>>>> >>>>>>> Re 3. >>>>>>> >>>>>>>> 2. The source tables are immutable during one run of batch >> processing >>>>>>> logic. >>>>>>>> 3. The cache is immutable during one run of batch processing logic. >>>>>>> >>>>>>>> I think assumption 2 and 3 are by definition what batch processing >>>>>>> means, >>>>>>>> i.e the data must be complete before it is processed and should not >>>>>>> change >>>>>>>> when the processing is running. >>>>>>> >>>>>>> I agree that this is how batch systems SHOULD be working. However I >>>> know >>>>>>> from my previous experience that it’s not always the case. Sometimes >>>> users >>>>>>> are just working on some non transactional storage, which can be >>>> (either >>>>>>> constantly or occasionally) being modified by some other processes >> for >>>>>>> whatever the reasons (fixing the data, updating, adding new data >> etc). >>>>>>> >>>>>>> But even if we ignore this point (data immutability), performance >> side >>>>>>> effect issue of your proposal remains. If user calls `void a.cache()` >>>> deep >>>>>>> inside some private method, it will have implicit side effects on >> other >>>>>>> parts of his program that might not be obvious. >>>>>>> >>>>>>> Re `CacheHandle`. >>>>>>> >>>>>>> If I understand it correctly, it only addresses the issue where to >>>> place >>>>>>> method `uncache`/`dropCache`. >>>>>>> >>>>>>> Btw, >>>>>>> >>>>>>>> In vast majority of the cases, users wouldn't really care whether >> the >>>>>>> cache is used or not. >>>>>>> >>>>>>> I wouldn’t agree with that, because “caching” (if not purely in >> memory >>>>>>> caching) would add additional IO costs. It’s similar as saying that >>>> users >>>>>>> would not see a difference between Spark/Flink and MapReduce >> (MapReduce >>>>>>> writes data to disks after every map/reduce stage). >>>>>>> >>>>>>> Piotrek >>>>>>> >>>>>>>> On 12 Dec 2018, at 14:28, Becket Qin <becket....@gmail.com> wrote: >>>>>>>> >>>>>>>> Hi Piotrek, >>>>>>>> >>>>>>>> Not sure if you noticed, in my last email, I was proposing >>>> `CacheHandle >>>>>>>> cache()` to avoid the potential side effect due to function calls. >>>>>>>> >>>>>>>> Let's look at the disagreement in your reply one by one. >>>>>>>> >>>>>>>> >>>>>>>> 1. Optimization chances >>>>>>>> >>>>>>>> Optimization is never a trivial work. This is exactly why we should >>>> not >>>>>>> let >>>>>>>> user manually do that. Databases have done huge amount of work in >> this >>>>>>>> area. At Alibaba, we rely heavily on many optimization rules to >> boost >>>>>>> the >>>>>>>> SQL query performance. >>>>>>>> >>>>>>>> In your example, if I filling the filter conditions in a certain >> way, >>>>>>> the >>>>>>>> optimization would become obvious. >>>>>>>> >>>>>>>> Table src1 = … // read from connector 1 >>>>>>>> Table src2 = … // read from connector 2 >>>>>>>> >>>>>>>> Table a = src1.filte('f1 > 10).join(src2.filter('f2 < 30), `f1 === >>>>>>>> `f2).as('f3, ...) >>>>>>>> a.cache() // write cache to connector 3, when writing the records, >>>>>>> remember >>>>>>>> min and max of `f1 >>>>>>>> >>>>>>>> a.filter('f3 > 30) // There is no need to read from any connector >>>>>>> because >>>>>>>> `a` does not contain any record whose 'f3 is greater than 30. >>>>>>>> env.execute() >>>>>>>> a.select(…) >>>>>>>> >>>>>>>> BTW, it seems to me that adding some basic statistics is fairly >>>>>>>> straightforward and the cost is pretty marginal if not ignorable. In >>>>>>> fact >>>>>>>> it is not only needed for optimization, but also for cases such as >> ML, >>>>>>>> where some algorithms may need to decide their parameter based on >> the >>>>>>>> statistics of the data. >>>>>>>> >>>>>>>> >>>>>>>> 2. Same API, one semantic now, another semantic later. >>>>>>>> >>>>>>>> I am trying to understand what is the semantic of `CachedTable >>>> cache()` >>>>>>> you >>>>>>>> are proposing. IMO, we should avoid designing an API whose semantic >>>>>>> will be >>>>>>>> changed later. If we have a "CachedTable cache()" method, then the >>>>>>> semantic >>>>>>>> should be very clearly defined upfront and do not change later. It >>>>>>> should >>>>>>>> never be "right now let's go with semantic 1, later we can silently >>>>>>> change >>>>>>>> it to semantic 2 or 3". Such change could result in bad consequence. >>>> For >>>>>>>> example, let's say we decide go with semantic 1: >>>>>>>> >>>>>>>> CachedTable cachedA = a.cache() >>>>>>>> cachedA.foo() // Cache is used >>>>>>>> a.bar() // Original DAG is used. >>>>>>>> >>>>>>>> Now majority of the users would be using cachedA.foo() in their >> code. >>>>>>> And >>>>>>>> some advanced users will use a.bar() to explicitly skip the cache. >>>> Later >>>>>>>> on, we added smart optimization and change the semantic to semantic >> 2: >>>>>>>> >>>>>>>> CachedTable cachedA = a.cache() >>>>>>>> cachedA.foo() // Cache is used >>>>>>>> a.bar() // Cache MIGHT be used, and Flink may decide to skip cache >> if >>>>>>> it is >>>>>>>> faster. >>>>>>>> >>>>>>>> Now most of the users who were writing cachedA.foo() will not >> benefit >>>>>>> from >>>>>>>> this optimization at all, unless they change their code to use >> a.foo() >>>>>>>> instead. And those advanced users suddenly lose the option to >>>> explicitly >>>>>>>> ignore cache unless they change their code (assuming we care enough >> to >>>>>>>> provide something like hint(useCache)). If we don't define the >>>> semantic >>>>>>>> carefully, our users will have to change their code again and again >>>>>>> while >>>>>>>> they shouldn't have to. >>>>>>>> >>>>>>>> >>>>>>>> 3. side effect. >>>>>>>> >>>>>>>> Before we talk about side effect, we have to agree on the >> assumptions. >>>>>>> The >>>>>>>> assumptions I have are following: >>>>>>>> 1. We are talking about batch processing. >>>>>>>> 2. The source tables are immutable during one run of batch >> processing >>>>>>> logic. >>>>>>>> 3. The cache is immutable during one run of batch processing logic. >>>>>>>> >>>>>>>> I think assumption 2 and 3 are by definition what batch processing >>>>>>> means, >>>>>>>> i.e the data must be complete before it is processed and should not >>>>>>> change >>>>>>>> when the processing is running. >>>>>>>> >>>>>>>> As far as I am aware of, I don't know any batch processing system >>>>>>> breaking >>>>>>>> those assumptions. Even for relational database tables, where >> queries >>>>>>> can >>>>>>>> run with concurrent modifications, necessary locking are still >>>> required >>>>>>> to >>>>>>>> ensure the integrity of the query result. >>>>>>>> >>>>>>>> Please let me know if you disagree with the above assumptions. If >> you >>>>>>> agree >>>>>>>> with these assumptions, with the `CacheHandle cache()` API in my >> last >>>>>>>> email, do you still see side effects? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> Jiangjie (Becket) Qin >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Dec 12, 2018 at 7:11 PM Piotr Nowojski < >>>> pi...@data-artisans.com >>>>>>>> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Becket, >>>>>>>>> >>>>>>>>>> Regarding the chance of optimization, it might not be that rare. >>>> Some >>>>>>>>> very >>>>>>>>>> simple statistics could already help in many cases. For example, >>>>>>> simply >>>>>>>>>> maintaining max and min of each fields can already eliminate some >>>>>>>>>> unnecessary table scan (potentially scanning the cached table) if >>>> the >>>>>>>>>> result is doomed to be empty. A histogram would give even further >>>>>>>>>> information. The optimizer could be very careful and only ignores >>>>>>> cache >>>>>>>>>> when it is 100% sure doing that is cheaper. e.g. only when a >> filter >>>> on >>>>>>>>> the >>>>>>>>>> cache will absolutely return nothing. >>>>>>>>> >>>>>>>>> I do not see how this might be easy to achieve. It would require >> tons >>>>>>> of >>>>>>>>> effort to make it work and in the end you would still have a >> problem >>>> of >>>>>>>>> comparing/trading CPU cycles vs IO. For example: >>>>>>>>> >>>>>>>>> Table src1 = … // read from connector 1 >>>>>>>>> Table src2 = … // read from connector 2 >>>>>>>>> >>>>>>>>> Table a = src1.filter(…).join(src2.filter(…), …) >>>>>>>>> a.cache() // write cache to connector 3 >>>>>>>>> >>>>>>>>> a.filter(…) >>>>>>>>> env.execute() >>>>>>>>> a.select(…) >>>>>>>>> >>>>>>>>> Decision whether it’s better to: >>>>>>>>> A) read from connector1/connector2, filter/map and join them twice >>>>>>>>> B) read from connector1/connector2, filter/map and join them once, >>>> pay >>>>>>> the >>>>>>>>> price of writing to connector 3 and then reading from it >>>>>>>>> >>>>>>>>> Is very far from trivial. `a` can end up much larger than `src1` >> and >>>>>>>>> `src2`, writes to connector 3 might be extremely slow, reads from >>>>>>> connector >>>>>>>>> 3 can be slower compared to reads from connector 1 & 2, … . You >>>> really >>>>>>> need >>>>>>>>> to have extremely good statistics to correctly asses size of the >>>>>>> output and >>>>>>>>> it would still be failing many times (correlations etc). And keep >> in >>>>>>> mind >>>>>>>>> that at the moment we do not have ANY statistics at all. More than >>>>>>> that, it >>>>>>>>> would require significantly more testing and setting up some >>>>>>> benchmarks to >>>>>>>>> make sure that we do not brake it with some regressions. >>>>>>>>> >>>>>>>>> That’s why I’m strongly opposing this idea - at least let’s not >>>> starts >>>>>>>>> with this. If we first start with completely manual/explicit >> caching, >>>>>>>>> without any magic, it would be a significant improvement for the >>>> users >>>>>>> for >>>>>>>>> a fraction of the development cost. After implementing that, when >> we >>>>>>>>> already have all of the working pieces, we can start working on >> some >>>>>>>>> optimisations rules. As I wrote before, if we start with >>>>>>>>> >>>>>>>>> `CachedTable cache()` >>>>>>>>> >>>>>>>>> We can later work on follow up stories to make it automatic. >> Despite >>>>>>> that >>>>>>>>> I don’t like this implicit/side effect approach with `void` method, >>>>>>> having >>>>>>>>> explicit `CachedTable cache()` wouldn’t even prevent as from later >>>>>>> adding >>>>>>>>> `void hintCache()` method, with the exact semantic that you want. >>>>>>>>> >>>>>>>>> On top of that I re-rise again that having implicit `void >>>>>>>>> cache()/hintCache()` has other side effects and problems with non >>>>>>> immutable >>>>>>>>> data, and being annoying when used secretly inside methods. >>>>>>>>> >>>>>>>>> Explicit `CachedTable cache()` just looks like much less >>>> controversial >>>>>>> MVP >>>>>>>>> and if we decide to go further with this topic, it’s not a wasted >>>>>>> effort, >>>>>>>>> but just lies on a stright path to more advanced/complicated >>>> solutions >>>>>>> in >>>>>>>>> the future. Are there any drawbacks of starting with `CachedTable >>>>>>> cache()` >>>>>>>>> that I’m missing? >>>>>>>>> >>>>>>>>> Piotrek >>>>>>>>> >>>>>>>>>> On 12 Dec 2018, at 09:30, Jeff Zhang <zjf...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>> Hi Becket, >>>>>>>>>> >>>>>>>>>> Introducing CacheHandle seems too complicated. That means users >> have >>>>>>> to >>>>>>>>>> maintain Handler properly. >>>>>>>>>> >>>>>>>>>> And since cache is just a hint for optimizer, why not just return >>>>>>> Table >>>>>>>>>> itself for cache method. This hint info should be kept in Table I >>>>>>>>> believe. >>>>>>>>>> >>>>>>>>>> So how about adding method cache and uncache for Table, and both >>>>>>> return >>>>>>>>>> Table. Because what cache and uncache did is just adding some hint >>>>>>> info >>>>>>>>>> into Table. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Becket Qin <becket....@gmail.com> 于2018年12月12日周三 上午11:25写道: >>>>>>>>>> >>>>>>>>>>> Hi Till and Piotrek, >>>>>>>>>>> >>>>>>>>>>> Thanks for the clarification. That solves quite a few confusion. >> My >>>>>>>>>>> understanding of how cache works is same as what Till describe. >>>> i.e. >>>>>>>>>>> cache() is a hint to Flink, but it is not guaranteed that cache >>>>>>> always >>>>>>>>>>> exist and it might be recomputed from its lineage. >>>>>>>>>>> >>>>>>>>>>> Is this the core of our disagreement here? That you would like >> this >>>>>>>>>>>> “cache()” to be mostly hint for the optimiser? >>>>>>>>>>> >>>>>>>>>>> Semantic wise, yes. That's also why I think materialize() has a >>>> much >>>>>>>>> larger >>>>>>>>>>> scope than cache(), thus it should be a different method. >>>>>>>>>>> >>>>>>>>>>> Regarding the chance of optimization, it might not be that rare. >>>> Some >>>>>>>>> very >>>>>>>>>>> simple statistics could already help in many cases. For example, >>>>>>> simply >>>>>>>>>>> maintaining max and min of each fields can already eliminate some >>>>>>>>>>> unnecessary table scan (potentially scanning the cached table) if >>>> the >>>>>>>>>>> result is doomed to be empty. A histogram would give even further >>>>>>>>>>> information. The optimizer could be very careful and only ignores >>>>>>> cache >>>>>>>>>>> when it is 100% sure doing that is cheaper. e.g. only when a >> filter >>>>>>> on >>>>>>>>> the >>>>>>>>>>> cache will absolutely return nothing. >>>>>>>>>>> >>>>>>>>>>> Given the above clarification on cache, I would like to revisit >> the >>>>>>>>>>> original "void cache()" proposal and see if we can improve on top >>>> of >>>>>>>>> that. >>>>>>>>>>> >>>>>>>>>>> What do you think about the following modified interface? >>>>>>>>>>> >>>>>>>>>>> Table { >>>>>>>>>>> /** >>>>>>>>>>> * This call hints Flink to maintain a cache of this table and >>>>>>> leverage >>>>>>>>>>> it for performance optimization if needed. >>>>>>>>>>> * Note that Flink may still decide to not use the cache if it is >>>>>>>>> cheaper >>>>>>>>>>> by doing so. >>>>>>>>>>> * >>>>>>>>>>> * A CacheHandle will be returned to allow user release the cache >>>>>>>>>>> actively. The cache will be deleted if there >>>>>>>>>>> * is no unreleased cache handlers to it. When the >> TableEnvironment >>>>>>> is >>>>>>>>>>> closed. The cache will also be deleted >>>>>>>>>>> * and all the cache handlers will be released. >>>>>>>>>>> * >>>>>>>>>>> * @return a CacheHandle referring to the cache of this table. >>>>>>>>>>> */ >>>>>>>>>>> CacheHandle cache(); >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> CacheHandle { >>>>>>>>>>> /** >>>>>>>>>>> * Close the cache handle. This method does not necessarily >> deletes >>>>>>> the >>>>>>>>>>> cache. Instead, it simply decrements the reference counter to the >>>>>>> cache. >>>>>>>>>>> * When the there is no handle referring to a cache. The cache >> will >>>>>>> be >>>>>>>>>>> deleted. >>>>>>>>>>> * >>>>>>>>>>> * @return the number of open handles to the cache after this >> handle >>>>>>>>> has >>>>>>>>>>> been released. >>>>>>>>>>> */ >>>>>>>>>>> int release() >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> The rationale behind this interface is following: >>>>>>>>>>> In vast majority of the cases, users wouldn't really care whether >>>> the >>>>>>>>> cache >>>>>>>>>>> is used or not. So I think the most intuitive way is letting >>>> cache() >>>>>>>>> return >>>>>>>>>>> nothing. So nobody needs to worry about the difference between >>>>>>>>> operations >>>>>>>>>>> on CacheTables and those on the "original" tables. This will make >>>>>>> maybe >>>>>>>>>>> 99.9% of the users happy. There were two concerns raised for this >>>>>>>>> approach: >>>>>>>>>>> 1. In some rare cases, users may want to ignore cache, >>>>>>>>>>> 2. A table might be cached/uncached in a third party function >> while >>>>>>> the >>>>>>>>>>> caller does not know. >>>>>>>>>>> >>>>>>>>>>> For the first issue, users can use hint("ignoreCache") to >>>> explicitly >>>>>>>>> ignore >>>>>>>>>>> cache. >>>>>>>>>>> For the second issue, the above proposal lets cache() return a >>>>>>>>> CacheHandle, >>>>>>>>>>> the only method in it is release(). Different CacheHandles will >>>>>>> refer to >>>>>>>>>>> the same cache, if a cache no longer has any cache handle, it >> will >>>> be >>>>>>>>>>> deleted. This will address the following case: >>>>>>>>>>> { >>>>>>>>>>> val handle1 = a.cache() >>>>>>>>>>> process(a) >>>>>>>>>>> a.select(...) // cache is still available, handle1 has not been >>>>>>>>> released. >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> void process(Table t) { >>>>>>>>>>> val handle2 = t.cache() // new handle to cache >>>>>>>>>>> t.select(...) // optimizer decides cache usage >>>>>>>>>>> t.hint("ignoreCache").select(...) // cache is ignored >>>>>>>>>>> handle2.release() // release the handle, but the cache may still >> be >>>>>>>>>>> available if there are other handles >>>>>>>>>>> ... >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> Does the above modified approach look reasonable to you? >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> >>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Tue, Dec 11, 2018 at 6:44 PM Till Rohrmann < >>>> trohrm...@apache.org> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Becket, >>>>>>>>>>>> >>>>>>>>>>>> I was aiming at semantics similar to 1. I actually thought that >>>>>>>>> `cache()` >>>>>>>>>>>> would tell the system to materialize the intermediate result so >>>> that >>>>>>>>>>>> subsequent queries don't need to reprocess it. This means that >> the >>>>>>>>> usage >>>>>>>>>>> of >>>>>>>>>>>> the cached table in this example >>>>>>>>>>>> >>>>>>>>>>>> { >>>>>>>>>>>> val cachedTable = a.cache() >>>>>>>>>>>> val b1 = cachedTable.select(…) >>>>>>>>>>>> val b2 = cachedTable.foo().select(…) >>>>>>>>>>>> val b3 = cachedTable.bar().select(...) >>>>>>>>>>>> val c1 = a.select(…) >>>>>>>>>>>> val c2 = a.foo().select(…) >>>>>>>>>>>> val c3 = a.bar().select(...) >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> strongly depends on interleaved calls which trigger the >> execution >>>> of >>>>>>>>> sub >>>>>>>>>>>> queries. So for example, if there is only a single env.execute >>>> call >>>>>>> at >>>>>>>>>>> the >>>>>>>>>>>> end of block, then b1, b2, b3, c1, c2 and c3 would all be >>>> computed >>>>>>> by >>>>>>>>>>>> reading directly from the sources (given that there is only a >>>> single >>>>>>>>>>>> JobGraph). It just happens that the result of `a` will be cached >>>>>>> such >>>>>>>>>>> that >>>>>>>>>>>> we skip the processing of `a` when there are subsequent queries >>>>>>> reading >>>>>>>>>>>> from `cachedTable`. If for some reason the system cannot >>>> materialize >>>>>>>>> the >>>>>>>>>>>> table (e.g. running out of disk space, ttl expired), then it >> could >>>>>>> also >>>>>>>>>>>> happen that we need to reprocess `a`. In that sense >> `cachedTable` >>>>>>>>> simply >>>>>>>>>>> is >>>>>>>>>>>> an identifier for the materialized result of `a` with the >> lineage >>>>>>> how >>>>>>>>> to >>>>>>>>>>>> reprocess it. >>>>>>>>>>>> >>>>>>>>>>>> Cheers, >>>>>>>>>>>> Till >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Dec 11, 2018 at 11:01 AM Piotr Nowojski < >>>>>>>>> pi...@data-artisans.com >>>>>>>>>>>> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Becket, >>>>>>>>>>>>> >>>>>>>>>>>>>> { >>>>>>>>>>>>>> val cachedTable = a.cache() >>>>>>>>>>>>>> val b = cachedTable.select(...) >>>>>>>>>>>>>> val c = a.select(...) >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> Semantic 1. b uses cachedTable as user demanded so. c uses >>>>>>> original >>>>>>>>>>> DAG >>>>>>>>>>>>> as >>>>>>>>>>>>>> user demanded so. In this case, the optimizer has no chance to >>>>>>>>>>>> optimize. >>>>>>>>>>>>>> Semantic 2. b uses cachedTable as user demanded so. c leaves >> the >>>>>>>>>>>>> optimizer >>>>>>>>>>>>>> to choose whether the cache or DAG should be used. In this >> case, >>>>>>> user >>>>>>>>>>>>> lose >>>>>>>>>>>>>> the option to NOT use cache. >>>>>>>>>>>>>> >>>>>>>>>>>>>> As you can see, neither of the options seem perfect. However, >> I >>>>>>> guess >>>>>>>>>>>> you >>>>>>>>>>>>>> and Till are proposing the third option: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Semantic 3. b leaves the optimizer to choose whether cache or >>>> DAG >>>>>>>>>>>> should >>>>>>>>>>>>> be >>>>>>>>>>>>>> used. c always use the DAG. >>>>>>>>>>>>> >>>>>>>>>>>>> I am pretty sure that me, Till, Fabian and others were all >>>>>>> proposing >>>>>>>>>>> and >>>>>>>>>>>>> advocating in favour of semantic “1”. No cost based optimiser >>>>>>>>> decisions >>>>>>>>>>>> at >>>>>>>>>>>>> all. >>>>>>>>>>>>> >>>>>>>>>>>>> { >>>>>>>>>>>>> val cachedTable = a.cache() >>>>>>>>>>>>> val b1 = cachedTable.select(…) >>>>>>>>>>>>> val b2 = cachedTable.foo().select(…) >>>>>>>>>>>>> val b3 = cachedTable.bar().select(...) >>>>>>>>>>>>> val c1 = a.select(…) >>>>>>>>>>>>> val c2 = a.foo().select(…) >>>>>>>>>>>>> val c3 = a.bar().select(...) >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> All b1, b2 and b3 are reading from cache, while c1, c2 and c3 >> are >>>>>>>>>>>>> re-executing whole plan for “a”. >>>>>>>>>>>>> >>>>>>>>>>>>> In the future we could discuss going one step further, >>>> introducing >>>>>>>>> some >>>>>>>>>>>>> global optimisation (that can be manually enabled/disabled): >>>>>>>>>>> deduplicate >>>>>>>>>>>>> plan nodes/deduplicate sub queries/re-use sub queries >> results/or >>>>>>>>>>> whatever >>>>>>>>>>>>> we could call it. It could do two things: >>>>>>>>>>>>> >>>>>>>>>>>>> 1. Automatically try to deduplicate fragments of the plan and >>>> share >>>>>>>>> the >>>>>>>>>>>>> result using CachedTable - in other words automatically insert >>>>>>>>>>>> `CachedTable >>>>>>>>>>>>> cache()` calls. >>>>>>>>>>>>> 2. Automatically make decision to bypass explicit `CachedTable` >>>>>>> access >>>>>>>>>>>>> (this would be the equivalent of what you described as >> “semantic >>>>>>> 3”). >>>>>>>>>>>>> >>>>>>>>>>>>> However as I wrote previously, I have big doubts if such >>>> cost-based >>>>>>>>>>>>> optimisation would work (this applies also to “Semantic 2”). I >>>>>>> would >>>>>>>>>>>> expect >>>>>>>>>>>>> it to do more harm than good in so many cases, that it wouldn’t >>>>>>> make >>>>>>>>>>>> sense. >>>>>>>>>>>>> Even assuming that we calculate statistics perfectly (this >> ain’t >>>>>>> gonna >>>>>>>>>>>>> happen), it’s virtually impossible to correctly estimate >> correct >>>>>>>>>>> exchange >>>>>>>>>>>>> rate of CPU cycles vs IO operations as it is changing so much >>>> from >>>>>>>>>>>>> deployment to deployment. >>>>>>>>>>>>> >>>>>>>>>>>>> Is this the core of our disagreement here? That you would like >>>> this >>>>>>>>>>>>> “cache()” to be mostly hint for the optimiser? >>>>>>>>>>>>> >>>>>>>>>>>>> Piotrek >>>>>>>>>>>>> >>>>>>>>>>>>>> On 11 Dec 2018, at 06:00, Becket Qin <becket....@gmail.com> >>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Another potential concern for semantic 3 is that. In the >> future, >>>>>>> we >>>>>>>>>>> may >>>>>>>>>>>>> add >>>>>>>>>>>>>> automatic caching to Flink. e.g. cache the intermediate >> results >>>> at >>>>>>>>>>> the >>>>>>>>>>>>>> shuffle boundary. If our semantic is that reference to the >>>>>>> original >>>>>>>>>>>> table >>>>>>>>>>>>>> means skipping cache, those users may not be able to benefit >>>> from >>>>>>> the >>>>>>>>>>>>>> implicit cache. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, Dec 11, 2018 at 12:10 PM Becket Qin < >>>> becket....@gmail.com >>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Piotrek, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks for the reply. Thought about it again, I might have >>>>>>>>>>>> misunderstood >>>>>>>>>>>>>>> your proposal in earlier emails. Returning a CachedTable >> might >>>>>>> not >>>>>>>>>>> be >>>>>>>>>>>> a >>>>>>>>>>>>> bad >>>>>>>>>>>>>>> idea. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I was more concerned about the semantic and its intuitiveness >>>>>>> when a >>>>>>>>>>>>>>> CachedTable is returned. i..e, if cache() returns >> CachedTable. >>>>>>> What >>>>>>>>>>>> are >>>>>>>>>>>>> the >>>>>>>>>>>>>>> semantic in the following code: >>>>>>>>>>>>>>> { >>>>>>>>>>>>>>> val cachedTable = a.cache() >>>>>>>>>>>>>>> val b = cachedTable.select(...) >>>>>>>>>>>>>>> val c = a.select(...) >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> What is the difference between b and c? At the first glance, >> I >>>>>>> see >>>>>>>>>>> two >>>>>>>>>>>>>>> options: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Semantic 1. b uses cachedTable as user demanded so. c uses >>>>>>> original >>>>>>>>>>>> DAG >>>>>>>>>>>>> as >>>>>>>>>>>>>>> user demanded so. In this case, the optimizer has no chance >> to >>>>>>>>>>>> optimize. >>>>>>>>>>>>>>> Semantic 2. b uses cachedTable as user demanded so. c leaves >>>> the >>>>>>>>>>>>> optimizer >>>>>>>>>>>>>>> to choose whether the cache or DAG should be used. In this >>>> case, >>>>>>>>>>> user >>>>>>>>>>>>> lose >>>>>>>>>>>>>>> the option to NOT use cache. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> As you can see, neither of the options seem perfect. >> However, I >>>>>>>>>>> guess >>>>>>>>>>>>> you >>>>>>>>>>>>>>> and Till are proposing the third option: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Semantic 3. b leaves the optimizer to choose whether cache or >>>> DAG >>>>>>>>>>>> should >>>>>>>>>>>>>>> be used. c always use the DAG. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This does address all the concerns. It is just that from >>>>>>>>>>> intuitiveness >>>>>>>>>>>>>>> perspective, I found that asking user to explicitly use a >>>>>>>>>>> CachedTable >>>>>>>>>>>>> while >>>>>>>>>>>>>>> the optimizer might choose to ignore is a little weird. That >>>> was >>>>>>>>>>> why I >>>>>>>>>>>>> did >>>>>>>>>>>>>>> not think about that semantic. But given there is material >>>>>>> benefit, >>>>>>>>>>> I >>>>>>>>>>>>> think >>>>>>>>>>>>>>> this semantic is acceptable. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1. If we want to let optimiser make decisions whether to use >>>>>>> cache >>>>>>>>>>> or >>>>>>>>>>>>> not, >>>>>>>>>>>>>>>> then why do we need “void cache()” method at all? Would It >>>>>>>>>>>> “increase” >>>>>>>>>>>>> the >>>>>>>>>>>>>>>> chance of using the cache? That’s sounds strange. What would >>>> be >>>>>>> the >>>>>>>>>>>>>>>> mechanism of deciding whether to use the cache or not? If we >>>>>>> want >>>>>>>>>>> to >>>>>>>>>>>>>>>> introduce such kind automated optimisations of “plan nodes >>>>>>>>>>>>> deduplication” >>>>>>>>>>>>>>>> I would turn it on globally, not per table, and let the >>>>>>> optimiser >>>>>>>>>>> do >>>>>>>>>>>>> all of >>>>>>>>>>>>>>>> the work. >>>>>>>>>>>>>>>> 2. We do not have statistics at the moment for any use/not >> use >>>>>>>>>>> cache >>>>>>>>>>>>>>>> decision. >>>>>>>>>>>>>>>> 3. Even if we had, I would be veeerryy sceptical whether >> such >>>>>>> cost >>>>>>>>>>>>> based >>>>>>>>>>>>>>>> optimisations would work properly and I would still insist >>>>>>> first on >>>>>>>>>>>>>>>> providing explicit caching mechanism (`CachedTable cache()`) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> We are absolutely on the same page here. An explicit cache() >>>>>>> method >>>>>>>>>>> is >>>>>>>>>>>>>>> necessary not only because optimizer may not be able to make >>>> the >>>>>>>>>>> right >>>>>>>>>>>>>>> decision, but also because of the nature of interactive >>>>>>> programming. >>>>>>>>>>>> For >>>>>>>>>>>>>>> example, if users write the following code in Scala shell: >>>>>>>>>>>>>>> val b = a.select(...) >>>>>>>>>>>>>>> val c = b.select(...) >>>>>>>>>>>>>>> val d = c.select(...).writeToSink(...) >>>>>>>>>>>>>>> tEnv.execute() >>>>>>>>>>>>>>> There is no way optimizer will know whether b or c will be >> used >>>>>>> in >>>>>>>>>>>> later >>>>>>>>>>>>>>> code, unless users hint explicitly. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> At the same time I’m not sure if you have responded to our >>>>>>>>>>> objections >>>>>>>>>>>> of >>>>>>>>>>>>>>>> `void cache()` being implicit/having side effects, which me, >>>>>>> Jark, >>>>>>>>>>>>> Fabian, >>>>>>>>>>>>>>>> Till and I think also Shaoxuan are supporting. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Is there any other side effects if we use semantic 3 >> mentioned >>>>>>>>>>> above? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> JIangjie (Becket) Qin >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Mon, Dec 10, 2018 at 7:54 PM Piotr Nowojski < >>>>>>>>>>>> pi...@data-artisans.com >>>>>>>>>>>>>> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Becket, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Sorry for not responding long time. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Regarding case1. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> There wouldn’t be no “a.unCache()” method, but I would >> expect >>>>>>> only >>>>>>>>>>>>>>>> `cachedTableA1.dropCache()`. Dropping `cachedTableA1` >> wouldn’t >>>>>>>>>>> affect >>>>>>>>>>>>>>>> `cachedTableA2`. Just as in any other database dropping >>>>>>> modifying >>>>>>>>>>> one >>>>>>>>>>>>>>>> independent table/materialised view does not affect others. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> What I meant is that assuming there is already a cached >>>> table, >>>>>>>>>>>> ideally >>>>>>>>>>>>>>>> users need >>>>>>>>>>>>>>>>> not to specify whether the next query should read from the >>>>>>> cache >>>>>>>>>>> or >>>>>>>>>>>>> use >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> original DAG. This should be decided by the optimizer. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 1. If we want to let optimiser make decisions whether to use >>>>>>> cache >>>>>>>>>>> or >>>>>>>>>>>>>>>> not, then why do we need “void cache()” method at all? Would >>>> It >>>>>>>>>>>>> “increase” >>>>>>>>>>>>>>>> the chance of using the cache? That’s sounds strange. What >>>>>>> would be >>>>>>>>>>>> the >>>>>>>>>>>>>>>> mechanism of deciding whether to use the cache or not? If we >>>>>>> want >>>>>>>>>>> to >>>>>>>>>>>>>>>> introduce such kind automated optimisations of “plan nodes >>>>>>>>>>>>> deduplication” >>>>>>>>>>>>>>>> I would turn it on globally, not per table, and let the >>>>>>> optimiser >>>>>>>>>>> do >>>>>>>>>>>>> all of >>>>>>>>>>>>>>>> the work. >>>>>>>>>>>>>>>> 2. We do not have statistics at the moment for any use/not >> use >>>>>>>>>>> cache >>>>>>>>>>>>>>>> decision. >>>>>>>>>>>>>>>> 3. Even if we had, I would be veeerryy sceptical whether >> such >>>>>>> cost >>>>>>>>>>>>> based >>>>>>>>>>>>>>>> optimisations would work properly and I would still insist >>>>>>> first on >>>>>>>>>>>>>>>> providing explicit caching mechanism (`CachedTable cache()`) >>>>>>>>>>>>>>>> 4. As Till wrote, having explicit `CachedTable cache()` >>>> doesn’t >>>>>>>>>>>>>>>> contradict future work on automated cost based caching. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> At the same time I’m not sure if you have responded to our >>>>>>>>>>> objections >>>>>>>>>>>>> of >>>>>>>>>>>>>>>> `void cache()` being implicit/having side effects, which me, >>>>>>> Jark, >>>>>>>>>>>>> Fabian, >>>>>>>>>>>>>>>> Till and I think also Shaoxuan are supporting. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Piotrek >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On 5 Dec 2018, at 12:42, Becket Qin <becket....@gmail.com> >>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi Till, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> It is true that after the first job submission, there will >> be >>>>>>> no >>>>>>>>>>>>>>>> ambiguity >>>>>>>>>>>>>>>>> in terms of whether a cached table is used or not. That is >>>> the >>>>>>>>>>> same >>>>>>>>>>>>> for >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> cache() without returning a CachedTable. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Conceptually one could think of cache() as introducing a >>>>>>> caching >>>>>>>>>>>>>>>> operator >>>>>>>>>>>>>>>>>> from which you need to consume from if you want to benefit >>>>>>> from >>>>>>>>>>> the >>>>>>>>>>>>>>>> caching >>>>>>>>>>>>>>>>>> functionality. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I am thinking a little differently. I think it is a hint >> (as >>>>>>> you >>>>>>>>>>>>>>>> mentioned >>>>>>>>>>>>>>>>> later) instead of a new operator. I'd like to be careful >>>> about >>>>>>> the >>>>>>>>>>>>>>>> semantic >>>>>>>>>>>>>>>>> of the API. A hint is a property set on an existing >> operator, >>>>>>> but >>>>>>>>>>> is >>>>>>>>>>>>> not >>>>>>>>>>>>>>>>> itself an operator as it does not really manipulate the >> data. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I agree, ideally the optimizer makes this kind of decision >>>>>>> which >>>>>>>>>>>>>>>>>> intermediate result should be cached. But especially when >>>>>>>>>>> executing >>>>>>>>>>>>>>>> ad-hoc >>>>>>>>>>>>>>>>>> queries the user might better know which results need to >> be >>>>>>>>>>> cached >>>>>>>>>>>>>>>> because >>>>>>>>>>>>>>>>>> Flink might not see the full DAG. In that sense, I would >>>>>>> consider >>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> cache() method as a hint for the optimizer. Of course, in >>>> the >>>>>>>>>>>> future >>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>> might add functionality which tries to automatically cache >>>>>>>>>>> results >>>>>>>>>>>>>>>> (e.g. >>>>>>>>>>>>>>>>>> caching the latest intermediate results until so and so >> much >>>>>>>>>>> space >>>>>>>>>>>> is >>>>>>>>>>>>>>>>>> used). But this should hopefully not contradict with >>>>>>> `CachedTable >>>>>>>>>>>>>>>> cache()`. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I agree that cache() method is needed for exactly the >> reason >>>>>>> you >>>>>>>>>>>>>>>> mentioned, >>>>>>>>>>>>>>>>> i.e. Flink cannot predict what users are going to write >>>> later, >>>>>>> so >>>>>>>>>>>>> users >>>>>>>>>>>>>>>>> need to tell Flink explicitly that this table will be used >>>>>>> later. >>>>>>>>>>>>> What I >>>>>>>>>>>>>>>>> meant is that assuming there is already a cached table, >>>> ideally >>>>>>>>>>>> users >>>>>>>>>>>>>>>> need >>>>>>>>>>>>>>>>> not to specify whether the next query should read from the >>>>>>> cache >>>>>>>>>>> or >>>>>>>>>>>>> use >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> original DAG. This should be decided by the optimizer. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> To explain the difference between returning / not >> returning a >>>>>>>>>>>>>>>> CachedTable, >>>>>>>>>>>>>>>>> I want compare the following two case: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> *Case 1: returning a CachedTable* >>>>>>>>>>>>>>>>> b = a.map(...) >>>>>>>>>>>>>>>>> val cachedTableA1 = a.cache() >>>>>>>>>>>>>>>>> val cachedTableA2 = a.cache() >>>>>>>>>>>>>>>>> b.print() // Just to make sure a is cached. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> c = a.filter(...) // User specify that the original DAG is >>>>>>> used? >>>>>>>>>>> Or >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> optimizer decides whether DAG or cache should be used? >>>>>>>>>>>>>>>>> d = cachedTableA1.filter() // User specify that the cached >>>>>>> table >>>>>>>>>>> is >>>>>>>>>>>>>>>> used. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> a.unCache() // Can cachedTableA still be used afterwards? >>>>>>>>>>>>>>>>> cachedTableA1.uncache() // Can cachedTableA2 still be used? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> *Case 2: not returning a CachedTable* >>>>>>>>>>>>>>>>> b = a.map() >>>>>>>>>>>>>>>>> a.cache() >>>>>>>>>>>>>>>>> a.cache() // no-op >>>>>>>>>>>>>>>>> b.print() // Just to make sure a is cached >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> c = a.filter(...) // Optimizer decides whether the cache or >>>> DAG >>>>>>>>>>>> should >>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>> used >>>>>>>>>>>>>>>>> d = a.filter(...) // Optimizer decides whether the cache or >>>> DAG >>>>>>>>>>>> should >>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>> used >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> a.unCache() >>>>>>>>>>>>>>>>> a.unCache() // no-op >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> In case 1, semantic wise, optimizer lose the option to >> choose >>>>>>>>>>>> between >>>>>>>>>>>>>>>> DAG >>>>>>>>>>>>>>>>> and cache. And the unCache() call becomes tricky. >>>>>>>>>>>>>>>>> In case 2, users do not need to worry about whether cache >> or >>>>>>> DAG >>>>>>>>>>> is >>>>>>>>>>>>>>>> used. >>>>>>>>>>>>>>>>> And the unCache() semantic is clear. However, the caveat is >>>>>>> that >>>>>>>>>>>> users >>>>>>>>>>>>>>>>> cannot explicitly ignore the cache. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> In order to address the issues mentioned in case 2 and >>>>>>> inspired by >>>>>>>>>>>> the >>>>>>>>>>>>>>>>> discussion so far, I am thinking about using hint to allow >>>> user >>>>>>>>>>>>>>>> explicitly >>>>>>>>>>>>>>>>> ignore cache. Although we do not have hint yet, but we >>>> probably >>>>>>>>>>>> should >>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>> one. So the code becomes: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> *Case 3: returning this table* >>>>>>>>>>>>>>>>> b = a.map() >>>>>>>>>>>>>>>>> a.cache() >>>>>>>>>>>>>>>>> a.cache() // no-op >>>>>>>>>>>>>>>>> b.print() // Just to make sure a is cached >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> c = a.filter(...) // Optimizer decides whether the cache or >>>> DAG >>>>>>>>>>>> should >>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>> used >>>>>>>>>>>>>>>>> d = a.hint("ignoreCache").filter(...) // DAG will be used >>>>>>> instead >>>>>>>>>>> of >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> cache. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> a.unCache() >>>>>>>>>>>>>>>>> a.unCache() // no-op >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> We could also let cache() return this table to allow >> chained >>>>>>>>>>> method >>>>>>>>>>>>>>>> calls. >>>>>>>>>>>>>>>>> Do you think this API addresses the concerns? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Wed, Dec 5, 2018 at 10:55 AM Jark Wu <imj...@gmail.com> >>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> All the recent discussions are focused on whether there >> is a >>>>>>>>>>>> problem >>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>> cache() not return a Table. >>>>>>>>>>>>>>>>>> It seems that returning a Table explicitly is more clear >>>> (and >>>>>>>>>>>> safe?). >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> So whether there are any problems if cache() returns a >>>> Table? >>>>>>>>>>>>> @Becket >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>> Jark >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Tue, 4 Dec 2018 at 22:27, Till Rohrmann < >>>>>>> trohrm...@apache.org >>>>>>>>>>>> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> It's true that b, c, d and e will all read from the >>>> original >>>>>>> DAG >>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>> generates a. But all subsequent operators (when running >>>>>>> multiple >>>>>>>>>>>>>>>> queries) >>>>>>>>>>>>>>>>>>> which reference cachedTableA should not need to reproduce >>>> `a` >>>>>>>>>>> but >>>>>>>>>>>>>>>>>> directly >>>>>>>>>>>>>>>>>>> consume the intermediate result. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Conceptually one could think of cache() as introducing a >>>>>>> caching >>>>>>>>>>>>>>>> operator >>>>>>>>>>>>>>>>>>> from which you need to consume from if you want to >> benefit >>>>>>> from >>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> caching >>>>>>>>>>>>>>>>>>> functionality. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I agree, ideally the optimizer makes this kind of >> decision >>>>>>> which >>>>>>>>>>>>>>>>>>> intermediate result should be cached. But especially when >>>>>>>>>>>> executing >>>>>>>>>>>>>>>>>> ad-hoc >>>>>>>>>>>>>>>>>>> queries the user might better know which results need to >> be >>>>>>>>>>> cached >>>>>>>>>>>>>>>>>> because >>>>>>>>>>>>>>>>>>> Flink might not see the full DAG. In that sense, I would >>>>>>>>>>> consider >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> cache() method as a hint for the optimizer. Of course, in >>>> the >>>>>>>>>>>> future >>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>> might add functionality which tries to automatically >> cache >>>>>>>>>>> results >>>>>>>>>>>>>>>> (e.g. >>>>>>>>>>>>>>>>>>> caching the latest intermediate results until so and so >>>> much >>>>>>>>>>> space >>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>> used). But this should hopefully not contradict with >>>>>>>>>>> `CachedTable >>>>>>>>>>>>>>>>>> cache()`. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>> Till >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Tue, Dec 4, 2018 at 2:33 PM Becket Qin < >>>>>>> becket....@gmail.com >>>>>>>>>>>> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi Till, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks for the clarification. I am still a little >>>> confused. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> If cache() returns a CachedTable, the example might >>>> become: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> b = a.map(...) >>>>>>>>>>>>>>>>>>>> c = a.map(...) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> cachedTableA = a.cache() >>>>>>>>>>>>>>>>>>>> d = cachedTableA.map(...) >>>>>>>>>>>>>>>>>>>> e = a.map() >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> In the above case, if cache() is lazily evaluated, b, >> c, d >>>>>>> and >>>>>>>>>>> e >>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>>>> going to be reading from the original DAG that generates >>>> a. >>>>>>> But >>>>>>>>>>>>> with >>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>> naive expectation, d should be reading from the cache. >>>> This >>>>>>>>>>> seems >>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>> solving the potential confusion you raised, right? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Just to be clear, my understanding are all based on the >>>>>>>>>>>> assumption >>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> tables are immutable. Therefore, after a.cache(), a the >>>>>>>>>>>>>>>> c*achedTableA* >>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>> original table *a * should be completely >> interchangeable. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> That said, I think a valid argument is optimization. >> There >>>>>>> are >>>>>>>>>>>>> indeed >>>>>>>>>>>>>>>>>>> cases >>>>>>>>>>>>>>>>>>>> that reading from the original DAG could be faster than >>>>>>> reading >>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> cache. For example, in the following example: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> a.filter(f1' > 100) >>>>>>>>>>>>>>>>>>>> a.cache() >>>>>>>>>>>>>>>>>>>> b = a.filter(f1' < 100) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Ideally the optimizer should be intelligent enough to >>>> decide >>>>>>>>>>>> which >>>>>>>>>>>>>>>> way >>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>> faster, without user intervention. In this case, it will >>>>>>>>>>> identify >>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>> b >>>>>>>>>>>>>>>>>>>> would just be an empty table, thus skip reading from the >>>>>>> cache >>>>>>>>>>>>>>>>>>> completely. >>>>>>>>>>>>>>>>>>>> But I agree that returning a CachedTable would give user >>>> the >>>>>>>>>>>>> control >>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>> when to use cache, even though I still feel that letting >>>> the >>>>>>>>>>>>>>>> optimizer >>>>>>>>>>>>>>>>>>>> handle this is a better option in long run. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Tue, Dec 4, 2018 at 6:51 PM Till Rohrmann < >>>>>>>>>>>> trohrm...@apache.org >>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Yes you are right Becket that it still depends on the >>>>>>> actual >>>>>>>>>>>>>>>>>> execution >>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>> the job whether a consumer reads from a cached result >> or >>>>>>> not. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> My point was actually about the properties of a (cached >>>> vs. >>>>>>>>>>>>>>>>>> non-cached) >>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>> not about the execution. I would not make cache trigger >>>> the >>>>>>>>>>>>>>>> execution >>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>> the job because one loses some flexibility by eagerly >>>>>>>>>>> triggering >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> execution. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I tried to argue for an explicit CachedTable which is >>>>>>> returned >>>>>>>>>>>> by >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> cache() method like Piotr did in order to make the API >>>> more >>>>>>>>>>>>>>>> explicit. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>> Till >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 4:23 PM Becket Qin < >>>>>>>>>>> becket....@gmail.com >>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Hi Till, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> That is a good example. Just a minor correction, in >> this >>>>>>>>>>> case, >>>>>>>>>>>>> b, c >>>>>>>>>>>>>>>>>>>> and d >>>>>>>>>>>>>>>>>>>>>> will all consume from a non-cached a. This is because >>>>>>> cache >>>>>>>>>>>> will >>>>>>>>>>>>>>>>>> only >>>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>> created on the very first job submission that >> generates >>>>>>> the >>>>>>>>>>>> table >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>> cached. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> If I understand correctly, this is example is about >>>>>>> whether >>>>>>>>>>>>>>>>>> .cache() >>>>>>>>>>>>>>>>>>>>> method >>>>>>>>>>>>>>>>>>>>>> should be eagerly evaluated or lazily evaluated. In >>>>>>> another >>>>>>>>>>>> word, >>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>>> cache() method actually triggers a job that creates >> the >>>>>>>>>>> cache, >>>>>>>>>>>>>>>>>> there >>>>>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>>>> be no such confusion. Is that right? >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> In the example, although d will not consume from the >>>>>>> cached >>>>>>>>>>>> Table >>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>>> looks supposed to, from correctness perspective the >> code >>>>>>> will >>>>>>>>>>>>> still >>>>>>>>>>>>>>>>>>>>> return >>>>>>>>>>>>>>>>>>>>>> correct result, assuming that tables are immutable. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Personally I feel it is OK because users probably >> won't >>>>>>>>>>> really >>>>>>>>>>>>>>>>>> worry >>>>>>>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>>>>>> whether the table is cached or not. And lazy cache >> could >>>>>>>>>>> avoid >>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>>>> unnecessary caching if a cached table is never created >>>> in >>>>>>> the >>>>>>>>>>>>> user >>>>>>>>>>>>>>>>>>>>>> application. But I am not opposed to do eager >> evaluation >>>>>>> of >>>>>>>>>>>>> cache. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 10:01 PM Till Rohrmann < >>>>>>>>>>>>>>>>>> trohrm...@apache.org> >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Another argument for Piotr's point is that lazily >>>>>>> changing >>>>>>>>>>>>>>>>>>> properties >>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>> node affects all down stream consumers but does not >>>>>>>>>>>> necessarily >>>>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>> happen before these consumers are defined. From a >>>> user's >>>>>>>>>>>>>>>>>>> perspective >>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>> can be quite confusing: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> b = a.map(...) >>>>>>>>>>>>>>>>>>>>>>> c = a.map(...) >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> a.cache() >>>>>>>>>>>>>>>>>>>>>>> d = a.map(...) >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> now b, c and d will consume from a cached operator. >> In >>>>>>> this >>>>>>>>>>>>> case, >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> user >>>>>>>>>>>>>>>>>>>>>>> would most likely expect that only d reads from a >>>> cached >>>>>>>>>>>> result. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>> Till >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 11:32 AM Piotr Nowojski < >>>>>>>>>>>>>>>>>>>>> pi...@data-artisans.com> >>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Hey Shaoxuan and Becket, >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Can you explain a bit more one what are the side >>>>>>> effects? >>>>>>>>>>> So >>>>>>>>>>>>>>>>>>> far >>>>>>>>>>>>>>>>>>>> my >>>>>>>>>>>>>>>>>>>>>>>>> understanding is that such side effects only exist >>>> if a >>>>>>>>>>>> table >>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>> mutable. >>>>>>>>>>>>>>>>>>>>>>>>> Is that the case? >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Not only that. There are also performance >> implications >>>>>>> and >>>>>>>>>>>>>>>>>> those >>>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>>> another implicit side effects of using `void >> cache()`. >>>>>>> As I >>>>>>>>>>>>>>>>>> wrote >>>>>>>>>>>>>>>>>>>>>> before, >>>>>>>>>>>>>>>>>>>>>>>> reading from cache might not always be desirable, >> thus >>>>>>> it >>>>>>>>>>> can >>>>>>>>>>>>>>>>>>> cause >>>>>>>>>>>>>>>>>>>>>>>> performance degradation and I’m fine with that - >>>> user's >>>>>>> or >>>>>>>>>>>>>>>>>>>>> optimiser’s >>>>>>>>>>>>>>>>>>>>>>>> choice. What I do not like is that this implicit >> side >>>>>>>>>>> effect >>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>> manifest >>>>>>>>>>>>>>>>>>>>>>>> in completely different part of code, that wasn’t >>>>>>> touched >>>>>>>>>>> by >>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>> user >>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>> he was adding `void cache()` call somewhere else. >> And >>>>>>> even >>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>> caching >>>>>>>>>>>>>>>>>>>>>>>> improves performance, it’s still a side effect of >>>> `void >>>>>>>>>>>>>>>>>> cache()`. >>>>>>>>>>>>>>>>>>>>>> Almost >>>>>>>>>>>>>>>>>>>>>>>> from the definition `void` methods have only side >>>>>>> effects. >>>>>>>>>>>> As I >>>>>>>>>>>>>>>>>>>> wrote >>>>>>>>>>>>>>>>>>>>>>>> before, there are couple of scenarios where this >> might >>>>>>> be >>>>>>>>>>>>>>>>>>>> undesirable >>>>>>>>>>>>>>>>>>>>>>>> and/or unexpected, for example: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> 1. >>>>>>>>>>>>>>>>>>>>>>>> Table b = …; >>>>>>>>>>>>>>>>>>>>>>>> b.cache() >>>>>>>>>>>>>>>>>>>>>>>> x = b.join(…) >>>>>>>>>>>>>>>>>>>>>>>> y = b.count() >>>>>>>>>>>>>>>>>>>>>>>> // ... >>>>>>>>>>>>>>>>>>>>>>>> // 100 >>>>>>>>>>>>>>>>>>>>>>>> // hundred >>>>>>>>>>>>>>>>>>>>>>>> // lines >>>>>>>>>>>>>>>>>>>>>>>> // of >>>>>>>>>>>>>>>>>>>>>>>> // code >>>>>>>>>>>>>>>>>>>>>>>> // later >>>>>>>>>>>>>>>>>>>>>>>> z = b.filter(…).groupBy(…) // this might be even >>>> hidden >>>>>>> in >>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>> different >>>>>>>>>>>>>>>>>>>>>>>> method/file/package/dependency >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> 2. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Table b = ... >>>>>>>>>>>>>>>>>>>>>>>> If (some_condition) { >>>>>>>>>>>>>>>>>>>>>>>> foo(b) >>>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>>> Else { >>>>>>>>>>>>>>>>>>>>>>>> bar(b) >>>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>>> z = b.filter(…).groupBy(…) >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Void foo(Table b) { >>>>>>>>>>>>>>>>>>>>>>>> b.cache() >>>>>>>>>>>>>>>>>>>>>>>> // do something with b >>>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> In both above examples, `b.cache()` will implicitly >>>>>>> affect >>>>>>>>>>>>>>>>>>>> (semantic >>>>>>>>>>>>>>>>>>>>>> of a >>>>>>>>>>>>>>>>>>>>>>>> program in case of sources being mutable and >>>>>>> performance) >>>>>>>>>>> `z >>>>>>>>>>>> = >>>>>>>>>>>>>>>>>>>>>>>> b.filter(…).groupBy(…)` which might be far from >>>> obvious. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On top of that, there is still this argument of mine >>>>>>> that >>>>>>>>>>>>>>>>>> having >>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>>> `MaterializedTable` or `CachedTable` handle is more >>>>>>>>>>> flexible >>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>> us >>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> future and for the user (as a manual option to >> bypass >>>>>>> cache >>>>>>>>>>>>>>>>>>> reads). >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> But Jiangjie is correct, >>>>>>>>>>>>>>>>>>>>>>>>> the source table in batching should be immutable. >> It >>>> is >>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> user’s >>>>>>>>>>>>>>>>>>>>>>>>> responsibility to ensure it, otherwise even a >> regular >>>>>>>>>>>>>>>>>> failover >>>>>>>>>>>>>>>>>>>> may >>>>>>>>>>>>>>>>>>>>>> lead >>>>>>>>>>>>>>>>>>>>>>>>> to inconsistent results. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Yes, I agree that’s what perfect world/good >> deployment >>>>>>>>>>> should >>>>>>>>>>>>>>>>>> be. >>>>>>>>>>>>>>>>>>>> But >>>>>>>>>>>>>>>>>>>>>> its >>>>>>>>>>>>>>>>>>>>>>>> often isn’t and while I’m not trying to fix this >>>> (since >>>>>>> the >>>>>>>>>>>>>>>>>>> proper >>>>>>>>>>>>>>>>>>>>> fix >>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>> to support transactions), I’m just trying to >> minimise >>>>>>>>>>>> confusion >>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> users that are not fully aware what’s going on and >>>>>>> operate >>>>>>>>>>> in >>>>>>>>>>>>>>>>>>> less >>>>>>>>>>>>>>>>>>>>> then >>>>>>>>>>>>>>>>>>>>>>>> perfect setup. And if something bites them after >>>> adding >>>>>>>>>>>>>>>>>>> `b.cache()` >>>>>>>>>>>>>>>>>>>>>> call, >>>>>>>>>>>>>>>>>>>>>>>> to make sure that they at least know all of the >> places >>>>>>> that >>>>>>>>>>>>>>>>>>> adding >>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>> line can affect. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks, Piotrek >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> On 1 Dec 2018, at 15:39, Becket Qin < >>>>>>> becket....@gmail.com >>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Hi Piotrek, >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks again for the clarification. Some more >> replies >>>>>>> are >>>>>>>>>>>>>>>>>>>>> following. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> But keep in mind that `.cache()` will/might not >> only >>>> be >>>>>>>>>>> used >>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>> interactive >>>>>>>>>>>>>>>>>>>>>>>>>> programming and not only in batching. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> It is true. Actually in stream processing, cache() >>>> has >>>>>>> the >>>>>>>>>>>>>>>>>> same >>>>>>>>>>>>>>>>>>>>>>> semantic >>>>>>>>>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>>>>>> batch processing. The semantic is following: >>>>>>>>>>>>>>>>>>>>>>>>> For a table created via a series of computation, >> save >>>>>>> that >>>>>>>>>>>>>>>>>>> table >>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>> later >>>>>>>>>>>>>>>>>>>>>>>>> reference to avoid running the computation logic to >>>>>>>>>>>>>>>>>> regenerate >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> table. >>>>>>>>>>>>>>>>>>>>>>>>> Once the application exits, drop all the cache. >>>>>>>>>>>>>>>>>>>>>>>>> This semantic is same for both batch and stream >>>>>>>>>>> processing. >>>>>>>>>>>>>>>>>> The >>>>>>>>>>>>>>>>>>>>>>>> difference >>>>>>>>>>>>>>>>>>>>>>>>> is that stream applications will only run once as >>>> they >>>>>>> are >>>>>>>>>>>>>>>>>> long >>>>>>>>>>>>>>>>>>>>>>> running. >>>>>>>>>>>>>>>>>>>>>>>>> And the batch applications may be run multiple >> times, >>>>>>>>>>> hence >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> cache >>>>>>>>>>>>>>>>>>>>>>> may >>>>>>>>>>>>>>>>>>>>>>>>> be created and dropped each time the application >>>> runs. >>>>>>>>>>>>>>>>>>>>>>>>> Admittedly, there will probably be some resource >>>>>>>>>>> management >>>>>>>>>>>>>>>>>>>>>>> requirements >>>>>>>>>>>>>>>>>>>>>>>>> for the streaming cached table, such as time based >> / >>>>>>> size >>>>>>>>>>>>>>>>>> based >>>>>>>>>>>>>>>>>>>>>>>> retention, >>>>>>>>>>>>>>>>>>>>>>>>> to address the infinite data issue. But such >>>>>>> requirement >>>>>>>>>>>> does >>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>> change >>>>>>>>>>>>>>>>>>>>>>>>> the semantic. >>>>>>>>>>>>>>>>>>>>>>>>> You are right that interactive programming is just >>>> one >>>>>>> use >>>>>>>>>>>>>>>>>> case >>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>> cache(). >>>>>>>>>>>>>>>>>>>>>>>>> It is not the only use case. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> For me the more important issue is of not having >> the >>>>>>> `void >>>>>>>>>>>>>>>>>>>> cache()` >>>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>>> side effects. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> This is indeed the key point. The argument around >>>>>>> whether >>>>>>>>>>>>>>>>>>> cache() >>>>>>>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>>>>>> return something already indicates that cache() and >>>>>>>>>>>>>>>>>>> materialize() >>>>>>>>>>>>>>>>>>>>>>> address >>>>>>>>>>>>>>>>>>>>>>>>> different issues. >>>>>>>>>>>>>>>>>>>>>>>>> Can you explain a bit more one what are the side >>>>>>> effects? >>>>>>>>>>> So >>>>>>>>>>>>>>>>>>> far >>>>>>>>>>>>>>>>>>>> my >>>>>>>>>>>>>>>>>>>>>>>>> understanding is that such side effects only exist >>>> if a >>>>>>>>>>>> table >>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>> mutable. >>>>>>>>>>>>>>>>>>>>>>>>> Is that the case? >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I don’t know, probably initially we should make >>>>>>>>>>> CachedTable >>>>>>>>>>>>>>>>>>>>>> read-only. >>>>>>>>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>>>>>>>> don’t find it more confusing than the fact that >> user >>>>>>> can >>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>> write >>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>> views >>>>>>>>>>>>>>>>>>>>>>>>>> or materialised views in SQL or that user >> currently >>>>>>> can >>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>> write >>>>>>>>>>>>>>>>>>>>>> to a >>>>>>>>>>>>>>>>>>>>>>>>>> Table. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I don't think anyone should insert something to a >>>>>>> cache. >>>>>>>>>>> By >>>>>>>>>>>>>>>>>>>>>> definition >>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> cache should only be updated when the corresponding >>>>>>>>>>> original >>>>>>>>>>>>>>>>>>>> table >>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>> updated. What I am wondering is that given the >>>>>>> following >>>>>>>>>>> two >>>>>>>>>>>>>>>>>>>> facts: >>>>>>>>>>>>>>>>>>>>>>>>> 1. If and only if a table is mutable (with >> something >>>>>>> like >>>>>>>>>>>>>>>>>>>>> insert()), >>>>>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>>>> CachedTable may have implicit behavior. >>>>>>>>>>>>>>>>>>>>>>>>> 2. A CachedTable extends a Table. >>>>>>>>>>>>>>>>>>>>>>>>> We can come to the conclusion that a CachedTable is >>>>>>>>>>> mutable >>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>> users >>>>>>>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>>>>> insert into the CachedTable directly. This is >> where I >>>>>>>>>>>> thought >>>>>>>>>>>>>>>>>>>>>>> confusing. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> On Sat, Dec 1, 2018 at 2:45 AM Piotr Nowojski < >>>>>>>>>>>>>>>>>>>>>> pi...@data-artisans.com >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Regarding naming `cache()` vs `materialize()`. One >>>>>>> more >>>>>>>>>>>>>>>>>>>>> explanation >>>>>>>>>>>>>>>>>>>>>>> why >>>>>>>>>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>>>>>>>> think `materialize()` is more natural to me is >> that >>>> I >>>>>>>>>>> think >>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>>>>>>>> “Table”s >>>>>>>>>>>>>>>>>>>>>>>>>> in Table-API as views. They behave the same way as >>>> SQL >>>>>>>>>>>>>>>>>> views, >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> only >>>>>>>>>>>>>>>>>>>>>>>>>> difference for me is that their live scope is >> short >>>> - >>>>>>>>>>>>>>>>>> current >>>>>>>>>>>>>>>>>>>>>> session >>>>>>>>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>>>>>>> is limited by different execution model. That’s >> why >>>>>>>>>>>>>>>>>> “cashing” >>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>> view >>>>>>>>>>>>>>>>>>>>>>>> for me >>>>>>>>>>>>>>>>>>>>>>>>>> is just materialising it. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> However I see and I understand your point of view. >>>>>>> Coming >>>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>>>>>>> DataSet/DataStream and generally speaking non-SQL >>>>>>> world, >>>>>>>>>>>>>>>>>>>> `cache()` >>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>> more >>>>>>>>>>>>>>>>>>>>>>>>>> natural. But keep in mind that `.cache()` >> will/might >>>>>>> not >>>>>>>>>>>>>>>>>> only >>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>> used >>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>> interactive programming and not only in batching. >>>> But >>>>>>>>>>>> naming >>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>> one >>>>>>>>>>>>>>>>>>>>>>>> issue, >>>>>>>>>>>>>>>>>>>>>>>>>> and not that critical to me. Especially that once >> we >>>>>>>>>>>>>>>>>> implement >>>>>>>>>>>>>>>>>>>>>> proper >>>>>>>>>>>>>>>>>>>>>>>>>> materialised views, we can always deprecate/rename >>>>>>>>>>>> `cache()` >>>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>> deem >>>>>>>>>>>>>>>>>>>>>>>> so. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> For me the more important issue is of not having >> the >>>>>>>>>>> `void >>>>>>>>>>>>>>>>>>>>> cache()` >>>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>>> side effects. Exactly for the reasons that you >> have >>>>>>>>>>>>>>>>>> mentioned. >>>>>>>>>>>>>>>>>>>>> True: >>>>>>>>>>>>>>>>>>>>>>>>>> results might be non deterministic if underlying >>>>>>> source >>>>>>>>>>>>>>>>>> table >>>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>>> changing. >>>>>>>>>>>>>>>>>>>>>>>>>> Problem is that `void cache()` implicitly changes >>>> the >>>>>>>>>>>>>>>>>> semantic >>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>>> subsequent uses of the cached/materialized Table. >> It >>>>>>> can >>>>>>>>>>>>>>>>>> cause >>>>>>>>>>>>>>>>>>>>> “wtf” >>>>>>>>>>>>>>>>>>>>>>>> moment >>>>>>>>>>>>>>>>>>>>>>>>>> for a user if he inserts “b.cache()” call in some >>>>>>> place >>>>>>>>>>> in >>>>>>>>>>>>>>>>>> his >>>>>>>>>>>>>>>>>>>>> code >>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>> suddenly some other random places are behaving >>>>>>>>>>> differently. >>>>>>>>>>>>>>>>>> If >>>>>>>>>>>>>>>>>>>>>>>>>> `materialize()` or `cache()` returns a Table >> handle, >>>>>>> we >>>>>>>>>>>>>>>>>> force >>>>>>>>>>>>>>>>>>>> user >>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>> explicitly use the cache which removes the >> “random” >>>>>>> part >>>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> "suddenly >>>>>>>>>>>>>>>>>>>>>>>>>> some other random places are behaving >> differently”. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> This argument and others that I’ve raised (greater >>>>>>>>>>>>>>>>>>>>>>> flexibility/allowing >>>>>>>>>>>>>>>>>>>>>>>>>> user to explicitly bypass the cache) are >> independent >>>>>>> of >>>>>>>>>>>>>>>>>>>> `cache()` >>>>>>>>>>>>>>>>>>>>> vs >>>>>>>>>>>>>>>>>>>>>>>>>> `materialize()` discussion. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Does that mean one can also insert into the >>>>>>> CachedTable? >>>>>>>>>>>>>>>>>> This >>>>>>>>>>>>>>>>>>>>>> sounds >>>>>>>>>>>>>>>>>>>>>>>>>> pretty confusing. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> I don’t know, probably initially we should make >>>>>>>>>>> CachedTable >>>>>>>>>>>>>>>>>>>>>>> read-only. I >>>>>>>>>>>>>>>>>>>>>>>>>> don’t find it more confusing than the fact that >> user >>>>>>> can >>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>> write >>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>> views >>>>>>>>>>>>>>>>>>>>>>>>>> or materialised views in SQL or that user >> currently >>>>>>> can >>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>> write >>>>>>>>>>>>>>>>>>>>>> to a >>>>>>>>>>>>>>>>>>>>>>>>>> Table. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Piotrek >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> On 30 Nov 2018, at 17:38, Xingcan Cui < >>>>>>>>>>> xingc...@gmail.com >>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> I agree with @Becket that `cache()` and >>>>>>> `materialize()` >>>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>>>> considered as two different methods where the >> later >>>>>>> one >>>>>>>>>>> is >>>>>>>>>>>>>>>>>>> more >>>>>>>>>>>>>>>>>>>>>>>>>> sophisticated. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> According to my understanding, the initial idea >> is >>>>>>> just >>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>> introduce >>>>>>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>>>>> simple cache or persist mechanism, but as the >>>> TableAPI >>>>>>>>>>> is a >>>>>>>>>>>>>>>>>>>>>> high-level >>>>>>>>>>>>>>>>>>>>>>>> API, >>>>>>>>>>>>>>>>>>>>>>>>>> it’s naturally for as to think in a SQL way. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Maybe we can add the `cache()` method to the >>>> DataSet >>>>>>> API >>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>> force >>>>>>>>>>>>>>>>>>>>>>>> users >>>>>>>>>>>>>>>>>>>>>>>>>> to translate a Table to a Dataset before caching >> it. >>>>>>> Then >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> users >>>>>>>>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>>>>>>> manually register the cached dataset to a table >>>> again >>>>>>> (we >>>>>>>>>>>>>>>>>> may >>>>>>>>>>>>>>>>>>>> need >>>>>>>>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>>>>>>>> table replacement mechanisms for datasets with an >>>>>>>>>>> identical >>>>>>>>>>>>>>>>>>>> schema >>>>>>>>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>>>>>>>>>>> different contents here). After all, it’s the >>>> dataset >>>>>>>>>>>> rather >>>>>>>>>>>>>>>>>>>> than >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>> dynamic table that need to be cached, right? >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>>>>> Xingcan >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Nov 30, 2018, at 10:57 AM, Becket Qin < >>>>>>>>>>>>>>>>>>>> becket....@gmail.com> >>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Piotrek and Jark, >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback and explanation. Those >> are >>>>>>> good >>>>>>>>>>>>>>>>>>>>> arguments. >>>>>>>>>>>>>>>>>>>>>>>> But I >>>>>>>>>>>>>>>>>>>>>>>>>>>> think those arguments are mostly about >>>> materialized >>>>>>>>>>> view. >>>>>>>>>>>>>>>>>>> Let >>>>>>>>>>>>>>>>>>>> me >>>>>>>>>>>>>>>>>>>>>> try >>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>>> explain the reason I believe cache() and >>>>>>> materialize() >>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>> different. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> I think cache() and materialize() have quite >>>>>>> different >>>>>>>>>>>>>>>>>>>>>> implications. >>>>>>>>>>>>>>>>>>>>>>>> An >>>>>>>>>>>>>>>>>>>>>>>>>>>> analogy I can think of is save()/publish(). When >>>>>>> users >>>>>>>>>>>>>>>>>> call >>>>>>>>>>>>>>>>>>>>>> cache(), >>>>>>>>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>>>>> just like they are saving an intermediate result >>>> as >>>>>>> a >>>>>>>>>>>>>>>>>> draft >>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>> their >>>>>>>>>>>>>>>>>>>>>>>>>> work, >>>>>>>>>>>>>>>>>>>>>>>>>>>> this intermediate result may not have any >>>> realistic >>>>>>>>>>>>>>>>>> meaning. >>>>>>>>>>>>>>>>>>>>>> Calling >>>>>>>>>>>>>>>>>>>>>>>>>>>> cache() does not mean users want to publish the >>>>>>> cached >>>>>>>>>>>>>>>>>> table >>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>> any >>>>>>>>>>>>>>>>>>>>>>>>>> manner. >>>>>>>>>>>>>>>>>>>>>>>>>>>> But when users call materialize(), that means "I >>>>>>> have >>>>>>>>>>>>>>>>>>>> something >>>>>>>>>>>>>>>>>>>>>>>>>> meaningful >>>>>>>>>>>>>>>>>>>>>>>>>>>> to be reused by others", now users need to think >>>>>>> about >>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> validation, >>>>>>>>>>>>>>>>>>>>>>>>>>>> update & versioning, lifecycle of the result, >> etc. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Piotrek's suggestions on variations of the >>>>>>>>>>> materialize() >>>>>>>>>>>>>>>>>>>> methods >>>>>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>>>>> very >>>>>>>>>>>>>>>>>>>>>>>>>>>> useful. It would be great if Flink have them. >> The >>>>>>>>>>> concept >>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>>> materialized >>>>>>>>>>>>>>>>>>>>>>>>>>>> view is actually a pretty big feature, not to >> say >>>>>>> the >>>>>>>>>>>>>>>>>>> related >>>>>>>>>>>>>>>>>>>>>> stuff >>>>>>>>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>>>>>>>>>> triggers/hooks you mentioned earlier. I think >> the >>>>>>>>>>>>>>>>>>> materialized >>>>>>>>>>>>>>>>>>>>>> view >>>>>>>>>>>>>>>>>>>>>>>>>> itself >>>>>>>>>>>>>>>>>>>>>>>>>>>> should be discussed in a more thorough and >>>>>>> systematic >>>>>>>>>>>>>>>>>>> manner. >>>>>>>>>>>>>>>>>>>>> And >>>>>>>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>>>>>>>> found >>>>>>>>>>>>>>>>>>>>>>>>>>>> that discussion is kind of orthogonal and way >>>> beyond >>>>>>>>>>>>>>>>>>>> interactive >>>>>>>>>>>>>>>>>>>>>>>>>>>> programming experience. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> The example you gave was interesting. I still >> have >>>>>>> some >>>>>>>>>>>>>>>>>>>>> questions, >>>>>>>>>>>>>>>>>>>>>>>>>> though. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Table source = … // some source that scans files >>>>>>> from a >>>>>>>>>>>>>>>>>>>>> directory >>>>>>>>>>>>>>>>>>>>>>>>>>>>> “/foo/bar/“ >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Table t1 = source.groupBy(…).select(…).where(…) >>>> ….; >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Table t2 = t1.materialize() // (or `cache()`) >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> t2.count() // initialise cache (if it’s lazily >>>>>>>>>>>>>>>>>> initialised) >>>>>>>>>>>>>>>>>>>>>>>>>>>>> int a1 = t1.count() >>>>>>>>>>>>>>>>>>>>>>>>>>>>> int b1 = t2.count() >>>>>>>>>>>>>>>>>>>>>>>>>>>>> // something in the background (or we trigger >> it) >>>>>>>>>>> writes >>>>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>>>> files >>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>>>> /foo/bar >>>>>>>>>>>>>>>>>>>>>>>>>>>>> int a2 = t1.count() >>>>>>>>>>>>>>>>>>>>>>>>>>>>> int b2 = t2.count() >>>>>>>>>>>>>>>>>>>>>>>>>>>>> t2.refresh() // possible future extension, not >> to >>>>>>> be >>>>>>>>>>>>>>>>>>>>> implemented >>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> initial version >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> what if someone else added some more files to >>>>>>> /foo/bar >>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>> point? >>>>>>>>>>>>>>>>>>>>>>>> In >>>>>>>>>>>>>>>>>>>>>>>>>>>> that case, a3 won't equals to b3, and the result >>>>>>> become >>>>>>>>>>>>>>>>>>>>>>>>>> non-deterministic, >>>>>>>>>>>>>>>>>>>>>>>>>>>> right? >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> int a3 = t1.count() >>>>>>>>>>>>>>>>>>>>>>>>>>>>> int b3 = t2.count() >>>>>>>>>>>>>>>>>>>>>>>>>>>>> t2.drop() // another possible future extension, >>>>>>> manual >>>>>>>>>>>>>>>>>>>> “cache” >>>>>>>>>>>>>>>>>>>>>>>> dropping >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> When we talk about interactive programming, in >>>> most >>>>>>>>>>>> cases, >>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>>>>> talking >>>>>>>>>>>>>>>>>>>>>>>>>>>> about batch applications. A fundamental >> assumption >>>>>>> of >>>>>>>>>>>> such >>>>>>>>>>>>>>>>>>>> case >>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>> source data is complete before the data >> processing >>>>>>>>>>>> begins, >>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> data >>>>>>>>>>>>>>>>>>>>>>>>>>>> will not change during the data processing. IMO, >>>> if >>>>>>>>>>>>>>>>>>> additional >>>>>>>>>>>>>>>>>>>>>> rows >>>>>>>>>>>>>>>>>>>>>>>>>> needs >>>>>>>>>>>>>>>>>>>>>>>>>>>> to be added to some source during the >> processing, >>>> it >>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>> done >>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>> ways >>>>>>>>>>>>>>>>>>>>>>>>>>>> like union the source with another table >>>> containing >>>>>>> the >>>>>>>>>>>>>>>>>> rows >>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>>>> added. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> There are a few cases that computations are >>>> executed >>>>>>>>>>>>>>>>>>>> repeatedly >>>>>>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>> changing data source. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> For example, people may run a ML training job >>>> every >>>>>>>>>>> hour >>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>> samples >>>>>>>>>>>>>>>>>>>>>>>>>>>> newly added in the past hour. In that case, the >>>>>>> source >>>>>>>>>>>>>>>>>> data >>>>>>>>>>>>>>>>>>>>>> between >>>>>>>>>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>>>>>>>>>> indeed change. But still, the data remain >>>> unchanged >>>>>>>>>>>> within >>>>>>>>>>>>>>>>>>> one >>>>>>>>>>>>>>>>>>>>>> run. >>>>>>>>>>>>>>>>>>>>>>>> And >>>>>>>>>>>>>>>>>>>>>>>>>>>> usually in that case, the result will need >>>>>>> versioning, >>>>>>>>>>>>>>>>>> i.e. >>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>>> given >>>>>>>>>>>>>>>>>>>>>>>>>>>> result, it tells that the result is a result >> from >>>>>>> the >>>>>>>>>>>>>>>>>> source >>>>>>>>>>>>>>>>>>>>> data >>>>>>>>>>>>>>>>>>>>>>> by a >>>>>>>>>>>>>>>>>>>>>>>>>>>> certain timestamp. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Another example is something like data >> warehouse. >>>> In >>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>> case, >>>>>>>>>>>>>>>>>>>>>>> there >>>>>>>>>>>>>>>>>>>>>>>>>> are a >>>>>>>>>>>>>>>>>>>>>>>>>>>> few source of original/raw data. On top of those >>>>>>>>>>> sources, >>>>>>>>>>>>>>>>>>> many >>>>>>>>>>>>>>>>>>>>>>>>>> materialized >>>>>>>>>>>>>>>>>>>>>>>>>>>> view / queries / reports / dashboards can be >>>>>>> created to >>>>>>>>>>>>>>>>>>>> generate >>>>>>>>>>>>>>>>>>>>>>>> derived >>>>>>>>>>>>>>>>>>>>>>>>>>>> data. Those derived data needs to be updated >> when >>>>>>> the >>>>>>>>>>>>>>>>>>>> underlying >>>>>>>>>>>>>>>>>>>>>>>>>> original >>>>>>>>>>>>>>>>>>>>>>>>>>>> data changes. In that case, the processing logic >>>>>>> that >>>>>>>>>>>>>>>>>>> derives >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>> original >>>>>>>>>>>>>>>>>>>>>>>>>>>> data needs to be executed repeatedly to update >>>> those >>>>>>>>>>>>>>>>>>>>>> reports/views. >>>>>>>>>>>>>>>>>>>>>>>>>> Again, >>>>>>>>>>>>>>>>>>>>>>>>>>>> all those derived data also need to ha >>>>>>> >>>>>>> >>>> >>>> >>>> >> >>