Hi,

I think that introducing ref counting could be confusing and it will be error 
prone, since Flink-table’s users are not used to closing/releasing resources. I 
was more objecting placing the `uncache()`/`dropCache()`/`releaseCache()` 
(releaseCache sounds best to me) as a method in the “Table”. It might be not 
obvious that it will drop the cache for all of the usages of the given table. 
For example:

public void foo(Table t) {
 // …
 t.releaseCache();
}

public void bar(Table t) {
  // ...
}

Table a = …
val cachedA = a.cache()

foo(cachedA)
bar(cachedA)


My problem with above example is that `t.releaseCache()` call is not doing the 
best possible job in communicating to the user that it will have a side effects 
for other places, like `bar(cachedA)` call. Something like this might be a 
better (not perfect, but just a bit better):

public void foo(Table t, CacheService cacheService) {
 // …
 cacheService.releaseCacheFor(t);
}

Table a = …
val cachedA = a.cache()

foo(cachedA, env.getCacheService())
bar(cachedA)


Also from another perspective, maybe placing `releaseCache()` method in Table 
might not be the best separation of concerns - `releaseCache()` method seams 
significantly different compared to other existing methods.

Piotrek

> On 8 Jan 2019, at 12:28, Becket Qin <becket....@gmail.com> wrote:
> 
> Hi Piotr,
> 
> You are right. There might be two intuitive meanings when users call
> 'a.uncache()', namely:
> 1. release the resource
> 2. Do not use cache for the next operation.
> 
> Case (1) would likely be the dominant use case. So I would suggest we
> dedicate uncache() method to case (1), i.e. for resource release, but not
> for ignoring cache.
> 
> For case 2, i.e. explicitly ignoring cache (which is rare), users may use
> something like 'hint("ignoreCache")'. I think this is better as it is a
> little weird for users to call `a.uncache()` while they may not even know
> if the table is cached at all.
> 
> Assuming we let `uncache()` to only release resource, one possibility is
> using ref count to mitigate the side effect. That means a ref count is
> incremented on `cache()` and decremented on `uncache()`. That means
> `uncache()` does not physically release the resource immediately, but just
> means the cache could be released.
> That being said, I am not sure if this is really a better solution as it
> seems a little counter intuitive. Maybe calling it releaseCache() help a
> little bit?
> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> 
> 
> 
> On Tue, Jan 8, 2019 at 5:36 PM Piotr Nowojski <pi...@da-platform.com> wrote:
> 
>> Hi Becket,
>> 
>> With `uncache` there are probably two features that we can think about:
>> 
>> a)
>> 
>> Physically dropping the cached table from the storage, freeing up the
>> resources
>> 
>> b)
>> 
>> Hinting the optimizer to not cache the reads for the next query/table
>> 
>> a) Has the issue as I wrote before, that it seemed to be an operation
>> inherently “flawed" with having side effects.
>> 
>> I’m not sure how it would be best to express. We could make it work:
>> 
>> 1. via a method on a Table as you proposed:
>> 
>> void Table#dropCache()
>> void Table#uncache()
>> 
>> 2. Operation on the environment
>> 
>> env.dropCacheFor(table) // or some other argument that allows user to
>> identify the desired cache
>> 
>> 3. Extending (from your original design doc) `setTableService` method to
>> return some control handle like:
>> 
>> TableServiceControl setTableService(TableFactory tf,
>>                     TableProperties properties,
>>                     TempTableCleanUpCallback cleanUpCallback);
>> 
>> (TableServiceControl? TableService? TableServiceHandle? CacheService?)
>> 
>> And having the drop cache method there:
>> 
>> TableServiceControl#dropCache(table)
>> 
>> Out of those options, option 1 might have a disadvantage of kind of not
>> making the user aware, that this is a global operation with side effects.
>> Like the old example of:
>> 
>> public void foo(Table t) {
>>  // …
>>  t.dropCache();
>> }
>> 
>> It might not be immediately obvious that `t.dropCache()` is some kind of
>> global operation, with side effects visible outside of the `foo` function.
>> 
>> On the other hand, both option 2 and 3, might have greater chance of
>> catching user’s attention:
>> 
>> public void foo(Table t, CacheService cacheService) {
>>  // …
>>  cacheService.dropCache(t);
>> }
>> 
>> b) could be achieved quite easily:
>> 
>> Table a = …
>> val notCached1 = a.doNotCache()
>> val cachedA = a.cache()
>> val notCached2 = cachedA.doNotCache() // equivalent of notCached1
>> 
>> `doNotCache()` would behave similarly to `cache()` - return a copy of the
>> table with removed “cache” hint and/or added “never cache” hint.
>> 
>> Piotrek
>> 
>> 
>>> On 8 Jan 2019, at 03:17, Becket Qin <becket....@gmail.com> wrote:
>>> 
>>> Hi Piotr,
>>> 
>>> Thanks for the proposal and detailed explanation. I like the idea of
>>> returning a new hinted Table without modifying the original table. This
>>> also leave the room for users to benefit from future implicit caching.
>>> 
>>> Just to make sure I get the full picture. In your proposal, there will
>> also
>>> be a 'void Table#uncache()' method to release the cache, right?
>>> 
>>> Thanks,
>>> 
>>> Jiangjie (Becket) Qin
>>> 
>>> On Mon, Jan 7, 2019 at 11:50 PM Piotr Nowojski <pi...@da-platform.com>
>>> wrote:
>>> 
>>>> Hi Becket!
>>>> 
>>>> After further thinking I tend to agree that my previous proposal
>> (*Option
>>>> 2*) indeed might not be if would in the future introduce automatic
>> caching.
>>>> However I would like to propose a slightly modified version of it:
>>>> 
>>>> *Option 4*
>>>> 
>>>> Adding `cache()` method with following signature:
>>>> 
>>>> Table Table#cache();
>>>> 
>>>> Without side-effects, and `cache()` call do not modify/change original
>>>> Table in any way.
>>>> It would return a copy of original table, with added hint for the
>>>> optimizer to cache the table, so that the future accesses to the
>> returned
>>>> table might be cached or not.
>>>> 
>>>> Assuming that we are talking about a setup, where we do not have
>> automatic
>>>> caching enabled (possible future extension).
>>>> 
>>>> Example #1:
>>>> 
>>>> ```
>>>> Table a = …
>>>> a.foo() // not cached
>>>> 
>>>> val cachedTable = a.cache();
>>>> 
>>>> cachedA.bar() // maybe cached
>>>> a.foo() // same as before - effectively not cached
>>>> ```
>>>> 
>>>> Both the first and the second `a.foo()` operations would behave in the
>>>> exactly same way. Again, `a.cache()` call doesn’t affect `a` itself. If
>> `a`
>>>> was not hinted for caching before `a.cache();`, then both `a.foo()`
>> calls
>>>> wouldn’t use cache.
>>>> 
>>>> Returned `cachedA` would be hinted with “cache” hint, so probably
>>>> `cachedA.bar()` would go through cache (unless optimiser decides the
>>>> opposite)
>>>> 
>>>> Example #2
>>>> 
>>>> ```
>>>> Table a = …
>>>> 
>>>> a.foo() // not cached
>>>> 
>>>> val b = a.cache();
>>>> 
>>>> a.foo() // same as before - effectively not cached
>>>> b.foo() // maybe cached
>>>> 
>>>> val c = b.cache();
>>>> 
>>>> a.foo() // same as before - effectively not cached
>>>> b.foo() // same as before - effectively maybe cached
>>>> c.foo() // maybe cached
>>>> ```
>>>> 
>>>> Now, assuming that we have some future “automatic caching optimisation”:
>>>> 
>>>> Example #3
>>>> 
>>>> ```
>>>> env.enableAutomaticCaching()
>>>> Table a = …
>>>> 
>>>> a.foo() // might be cached, depending if `a` was selected to automatic
>>>> caching
>>>> 
>>>> val b = a.cache();
>>>> 
>>>> a.foo() // same as before - might be cached, if `a` was selected to
>>>> automatic caching
>>>> b.foo() // maybe cached
>>>> ```
>>>> 
>>>> 
>>>> More or less this is the same behaviour as:
>>>> 
>>>> Table a = ...
>>>> val b = a.filter(x > 20)
>>>> 
>>>> calling `filter` hasn’t changed or altered `a` in anyway. If `a` was
>>>> previously filtered:
>>>> 
>>>> Table src = …
>>>> val a = src.filter(x > 20)
>>>> val b = a.filter(x > 20)
>>>> 
>>>> then yes, `a` and `b` will be the same. But the point is that neither
>>>> `filter` nor `cache` changes the original `a` table.
>>>> 
>>>> One thing is that indeed, physically dropping cache operation, will have
>>>> side effects and it will in a way mutate the cached table references.
>> But
>>>> this is I think unavoidable in any solution - the same issue as calling
>>>> `.close()`, or calling destructor in C++.
>>>> 
>>>> Piotrek
>>>> 
>>>>> On 7 Jan 2019, at 10:41, Becket Qin <becket....@gmail.com> wrote:
>>>>> 
>>>>> Happy New Year, everybody!
>>>>> 
>>>>> I would like to resume this discussion thread. At this point, We have
>>>>> agreed on the first step goal of interactive programming. The open
>>>>> discussion is the exact API. More specifically, what should *cache()*
>>>>> method return and what is the semantic. There are three options:
>>>>> 
>>>>> *Option 1*
>>>>> *void cache()* OR *Table cache()* which returns the original table for
>>>>> chained calls.
>>>>> *void uncache() *releases the cache.
>>>>> *Table.hint(ignoreCache).foo()* to ignore cache for operation foo().
>>>>> 
>>>>> - Semantic: a.cache() hints that table 'a' should be cached. Optimizer
>>>>> decides whether the cache will be used or not.
>>>>> - pros: simple and no confusion between CachedTable and original table
>>>>> - cons: A table may be cached / uncached in a method invocation, while
>>>> the
>>>>> caller does not know about this.
>>>>> 
>>>>> *Option 2*
>>>>> *CachedTable cache()*
>>>>> *CachedTable *extends *Table *with an additional *uncache()* method
>>>>> 
>>>>> - Semantic: After *val cachedA = a.cache()*, *cachedA.foo()* will
>> always
>>>>> use cache. *a.bar() *will always use original DAG.
>>>>> - pros: No potential side effects in method invocation.
>>>>> - cons: Optimizer has no chance to kick in. Future optimization will
>>>> become
>>>>> a behavior change and need users to change the code.
>>>>> 
>>>>> *Option 3*
>>>>> *CacheHandle cache()*
>>>>> *CacheHandle.release() *to release a cache handle on the table. If all
>>>>> cache handles are released, the cache could be removed.
>>>>> *Table.hint(ignoreCache).foo()* to ignore cache for operation foo().
>>>>> 
>>>>> - Semantic: *a.cache() *hints that 'a' should be cached. Optimizer
>>>> decides
>>>>> whether the cache will be used or not. Cache is released either no
>> handle
>>>>> is on it, or the user program exits.
>>>>> - pros: No potential side effect in method invocation. No confusion
>>>> between
>>>>> cached table v.s original table.
>>>>> - cons: An additional CacheHandle exposed to the users.
>>>>> 
>>>>> 
>>>>> Personally I prefer option 3 for the following reasons:
>>>>> 1. It is simple. Vast majority of the users would just call
>>>>> *a.cache()* followed
>>>>> by *a.foo(),* *a.bar(), etc. *
>>>>> 2. There is no semantic ambiguity and semantic change if we decide to
>> add
>>>>> implicit cache in the future.
>>>>> 3. There is no side effect in the method calls.
>>>>> 4. Admittedly we need to expose one more CacheHandle class to the
>> users.
>>>>> But it is not that difficult to understand given similar well known
>>>> concept
>>>>> like ref count (we can name it CacheReference if that is easier to
>>>>> understand). So I think it is fine.
>>>>> 
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> Jiangjie (Becket) Qin
>>>>> 
>>>>> 
>>>>> On Thu, Dec 13, 2018 at 11:23 AM Becket Qin <becket....@gmail.com>
>>>> wrote:
>>>>> 
>>>>>> Hi Piotrek,
>>>>>> 
>>>>>> 1. Regarding optimization.
>>>>>> Sure there are many cases that the decision is hard to make. But that
>>>> does
>>>>>> not make it any easier for the users to make those decisions. I
>> imagine
>>>> 99%
>>>>>> of the users would just naively use cache. I am not saying we can
>>>> optimize
>>>>>> in all the cases. But as long as we agree that at least in certain
>>>> cases (I
>>>>>> would argue most cases), optimizer can do a little better than an
>>>> average
>>>>>> user who likely knows little about Flink internals, we should not push
>>>> the
>>>>>> burden of optimization to users.
>>>>>> 
>>>>>> BTW, it seems some of your concerns are related to the
>> implementation. I
>>>>>> did not mention the implementation of the caching service because that
>>>>>> should not affect the API semantic. Not sure if this helps, but
>> imagine
>>>> the
>>>>>> default implementation has one StorageNode service colocating with
>> each
>>>> TM.
>>>>>> It could be running within the TM process or in a standalone process,
>>>>>> depending on configuration.
>>>>>> 
>>>>>> The StorageNode uses memory + spill-to-disk mechanism. The cached data
>>>>>> will just be written to the local StorageNode service. If the
>>>> StorageNode
>>>>>> is running within the TM process, the in-memory cache could just be
>>>> objects
>>>>>> so we save some serde cost. A later job referring to the cached Table
>>>> will
>>>>>> be scheduled in a locality aware manner, i.e. run in the TM whose peer
>>>>>> StorageNode hosts the data.
>>>>>> 
>>>>>> 
>>>>>> 2. Semantic
>>>>>> I am not sure why introducing a new hintCache() or
>>>>>> env.enableAutomaticCaching() method would avoid the consequence of
>>>> semantic
>>>>>> change.
>>>>>> 
>>>>>> If the auto optimization is not enabled by default, users still need
>> to
>>>>>> make code change to all existing programs in order to get the benefit.
>>>>>> If the auto optimization is enabled by default, advanced users who
>> know
>>>>>> that they really want to use cache will suddenly lose the opportunity
>>>> to do
>>>>>> so, unless they change the code to disable auto optimization.
>>>>>> 
>>>>>> 
>>>>>> 3. side effect
>>>>>> The CacheHandle is not only for where to put uncache(). It is to solve
>>>> the
>>>>>> implicit performance impact by moving the uncache() to the
>> CacheHandle.
>>>>>> 
>>>>>> - If users wants to leverage cache, they can call a.cache(). After
>>>>>> that, unless user explicitly release that CacheHandle, a.foo() will
>>>> always
>>>>>> leverage cache if needed (optimizer may choose to ignore cache if
>> that
>>>>>> helps accelerate the process). Any function call will not be able to
>>>>>> release the cache because they do not have that CacheHandle.
>>>>>> - If some advanced users do not want to use cache at all, they will
>>>>>> call a.hint(ignoreCache).foo(). This will for sure ignore cache and
>>>> use the
>>>>>> original DAG to process.
>>>>>> 
>>>>>> 
>>>>>>> In vast majority of the cases, users wouldn't really care whether the
>>>>>>> cache is used or not.
>>>>>>> I wouldn’t agree with that, because “caching” (if not purely in
>> memory
>>>>>>> caching) would add additional IO costs. It’s similar as saying that
>>>> users
>>>>>>> would not see a difference between Spark/Flink and MapReduce
>> (MapReduce
>>>>>>> writes data to disks after every map/reduce stage).
>>>>>> 
>>>>>> What I wanted to say is that in most cases, after users call cache(),
>>>> they
>>>>>> don't really care about whether auto optimization has decided to
>> ignore
>>>> the
>>>>>> cache or not, as long as the program runs faster.
>>>>>> 
>>>>>> Thanks,
>>>>>> 
>>>>>> Jiangjie (Becket) Qin
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Wed, Dec 12, 2018 at 10:50 PM Piotr Nowojski <
>>>> pi...@data-artisans.com>
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi,
>>>>>>> 
>>>>>>> Thanks for the quick answer :)
>>>>>>> 
>>>>>>> Re 1.
>>>>>>> 
>>>>>>> I generally agree with you, however couple of points:
>>>>>>> 
>>>>>>> a) the problem with using automatic caching is bigger, because you
>> will
>>>>>>> have to decide, how do you compare IO vs CPU costs and if you pick
>>>> wrong,
>>>>>>> additional IO costs might be enormous or even can crash your system.
>>>> This
>>>>>>> is more difficult problem compared to let say join reordering, where
>>>> the
>>>>>>> only issue is to have good statistics that can capture correlations
>>>> between
>>>>>>> columns (when you reorder joins number of IO operations do not
>> change)
>>>>>>> c) your example is completely independent of caching.
>>>>>>> 
>>>>>>> Query like this:
>>>>>>> 
>>>>>>> src1.filte('f1 > 10).join(src2.filter('f2 < 30), `f1 ===`f2).as('f3,
>>>>>>> …).filter(‘f3 > 30)
>>>>>>> 
>>>>>>> Should/could be optimised to empty result immediately, without the
>> need
>>>>>>> for any cache/materialisation and that should work even without any
>>>>>>> statistics provided by the connector.
>>>>>>> 
>>>>>>> For me prerequisite to any serious cost-based optimisations would be
>>>> some
>>>>>>> reasonable benchmark coverage of the code (tpch?). Otherwise that
>>>> would be
>>>>>>> equivalent of adding not tested code, since we wouldn’t be able to
>>>> verify
>>>>>>> our assumptions, like how does the writing of 10 000 records to
>>>>>>> cache/RocksDB/Kafka/CSV file compare to joining/filtering/processing
>> of
>>>>>>> lets say 1000 000 rows.
>>>>>>> 
>>>>>>> Re 2.
>>>>>>> 
>>>>>>> I wasn’t proposing to change the semantic later. I was proposing that
>>>> we
>>>>>>> start now:
>>>>>>> 
>>>>>>> CachedTable cachedA = a.cache()
>>>>>>> cachedA.foo() // Cache is used
>>>>>>> a.bar() // Original DAG is used
>>>>>>> 
>>>>>>> And then later we can think about adding for example
>>>>>>> 
>>>>>>> CachedTable cachedA = a.hintCache()
>>>>>>> cachedA.foo() // Cache might be used
>>>>>>> a.bar() // Original DAG is used
>>>>>>> 
>>>>>>> Or
>>>>>>> 
>>>>>>> env.enableAutomaticCaching()
>>>>>>> a.foo() // Cache might be used
>>>>>>> a.bar() // Cache might be used
>>>>>>> 
>>>>>>> Or (I would still not like this option):
>>>>>>> 
>>>>>>> a.hintCache()
>>>>>>> a.foo() // Cache might be used
>>>>>>> a.bar() // Cache might be used
>>>>>>> 
>>>>>>> Or whatever else that will come to our mind. Even if we add some
>>>>>>> automatic caching in the future, keeping implicit (`CachedTable
>>>> cache()`)
>>>>>>> caching will still be useful, at least in some cases.
>>>>>>> 
>>>>>>> Re 3.
>>>>>>> 
>>>>>>>> 2. The source tables are immutable during one run of batch
>> processing
>>>>>>> logic.
>>>>>>>> 3. The cache is immutable during one run of batch processing logic.
>>>>>>> 
>>>>>>>> I think assumption 2 and 3 are by definition what batch processing
>>>>>>> means,
>>>>>>>> i.e the data must be complete before it is processed and should not
>>>>>>> change
>>>>>>>> when the processing is running.
>>>>>>> 
>>>>>>> I agree that this is how batch systems SHOULD be working. However I
>>>> know
>>>>>>> from my previous experience that it’s not always the case. Sometimes
>>>> users
>>>>>>> are just working on some non transactional storage, which can be
>>>> (either
>>>>>>> constantly or occasionally) being modified by some other processes
>> for
>>>>>>> whatever the reasons (fixing the data, updating, adding new data
>> etc).
>>>>>>> 
>>>>>>> But even if we ignore this point (data immutability), performance
>> side
>>>>>>> effect issue of your proposal remains. If user calls `void a.cache()`
>>>> deep
>>>>>>> inside some private method, it will have implicit side effects on
>> other
>>>>>>> parts of his program that might not be obvious.
>>>>>>> 
>>>>>>> Re `CacheHandle`.
>>>>>>> 
>>>>>>> If I understand it correctly, it only addresses the issue where to
>>>> place
>>>>>>> method `uncache`/`dropCache`.
>>>>>>> 
>>>>>>> Btw,
>>>>>>> 
>>>>>>>> In vast majority of the cases, users wouldn't really care whether
>> the
>>>>>>> cache is used or not.
>>>>>>> 
>>>>>>> I wouldn’t agree with that, because “caching” (if not purely in
>> memory
>>>>>>> caching) would add additional IO costs. It’s similar as saying that
>>>> users
>>>>>>> would not see a difference between Spark/Flink and MapReduce
>> (MapReduce
>>>>>>> writes data to disks after every map/reduce stage).
>>>>>>> 
>>>>>>> Piotrek
>>>>>>> 
>>>>>>>> On 12 Dec 2018, at 14:28, Becket Qin <becket....@gmail.com> wrote:
>>>>>>>> 
>>>>>>>> Hi Piotrek,
>>>>>>>> 
>>>>>>>> Not sure if you noticed, in my last email, I was proposing
>>>> `CacheHandle
>>>>>>>> cache()` to avoid the potential side effect due to function calls.
>>>>>>>> 
>>>>>>>> Let's look at the disagreement in your reply one by one.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 1. Optimization chances
>>>>>>>> 
>>>>>>>> Optimization is never a trivial work. This is exactly why we should
>>>> not
>>>>>>> let
>>>>>>>> user manually do that. Databases have done huge amount of work in
>> this
>>>>>>>> area. At Alibaba, we rely heavily on many optimization rules to
>> boost
>>>>>>> the
>>>>>>>> SQL query performance.
>>>>>>>> 
>>>>>>>> In your example, if I filling the filter conditions in a certain
>> way,
>>>>>>> the
>>>>>>>> optimization would become obvious.
>>>>>>>> 
>>>>>>>> Table src1 = … // read from connector 1
>>>>>>>> Table src2 = … // read from connector 2
>>>>>>>> 
>>>>>>>> Table a = src1.filte('f1 > 10).join(src2.filter('f2 < 30), `f1 ===
>>>>>>>> `f2).as('f3, ...)
>>>>>>>> a.cache() // write cache to connector 3, when writing the records,
>>>>>>> remember
>>>>>>>> min and max of `f1
>>>>>>>> 
>>>>>>>> a.filter('f3 > 30) // There is no need to read from any connector
>>>>>>> because
>>>>>>>> `a` does not contain any record whose 'f3 is greater than 30.
>>>>>>>> env.execute()
>>>>>>>> a.select(…)
>>>>>>>> 
>>>>>>>> BTW, it seems to me that adding some basic statistics is fairly
>>>>>>>> straightforward and the cost is pretty marginal if not ignorable. In
>>>>>>> fact
>>>>>>>> it is not only needed for optimization, but also for cases such as
>> ML,
>>>>>>>> where some algorithms may need to decide their parameter based on
>> the
>>>>>>>> statistics of the data.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 2. Same API, one semantic now, another semantic later.
>>>>>>>> 
>>>>>>>> I am trying to understand what is the semantic of `CachedTable
>>>> cache()`
>>>>>>> you
>>>>>>>> are proposing. IMO, we should avoid designing an API whose semantic
>>>>>>> will be
>>>>>>>> changed later. If we have a "CachedTable cache()" method, then the
>>>>>>> semantic
>>>>>>>> should be very clearly defined upfront and do not change later. It
>>>>>>> should
>>>>>>>> never be "right now let's go with semantic 1, later we can silently
>>>>>>> change
>>>>>>>> it to semantic 2 or 3". Such change could result in bad consequence.
>>>> For
>>>>>>>> example, let's say we decide go with semantic 1:
>>>>>>>> 
>>>>>>>> CachedTable cachedA = a.cache()
>>>>>>>> cachedA.foo() // Cache is used
>>>>>>>> a.bar() // Original DAG is used.
>>>>>>>> 
>>>>>>>> Now majority of the users would be using cachedA.foo() in their
>> code.
>>>>>>> And
>>>>>>>> some advanced users will use a.bar() to explicitly skip the cache.
>>>> Later
>>>>>>>> on, we added smart optimization and change the semantic to semantic
>> 2:
>>>>>>>> 
>>>>>>>> CachedTable cachedA = a.cache()
>>>>>>>> cachedA.foo() // Cache is used
>>>>>>>> a.bar() // Cache MIGHT be used, and Flink may decide to skip cache
>> if
>>>>>>> it is
>>>>>>>> faster.
>>>>>>>> 
>>>>>>>> Now most of the users who were writing cachedA.foo() will not
>> benefit
>>>>>>> from
>>>>>>>> this optimization at all, unless they change their code to use
>> a.foo()
>>>>>>>> instead. And those advanced users suddenly lose the option to
>>>> explicitly
>>>>>>>> ignore cache unless they change their code (assuming we care enough
>> to
>>>>>>>> provide something like hint(useCache)). If we don't define the
>>>> semantic
>>>>>>>> carefully, our users will have to change their code again and again
>>>>>>> while
>>>>>>>> they shouldn't have to.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 3. side effect.
>>>>>>>> 
>>>>>>>> Before we talk about side effect, we have to agree on the
>> assumptions.
>>>>>>> The
>>>>>>>> assumptions I have are following:
>>>>>>>> 1. We are talking about batch processing.
>>>>>>>> 2. The source tables are immutable during one run of batch
>> processing
>>>>>>> logic.
>>>>>>>> 3. The cache is immutable during one run of batch processing logic.
>>>>>>>> 
>>>>>>>> I think assumption 2 and 3 are by definition what batch processing
>>>>>>> means,
>>>>>>>> i.e the data must be complete before it is processed and should not
>>>>>>> change
>>>>>>>> when the processing is running.
>>>>>>>> 
>>>>>>>> As far as I am aware of, I don't know any batch processing system
>>>>>>> breaking
>>>>>>>> those assumptions. Even for relational database tables, where
>> queries
>>>>>>> can
>>>>>>>> run with concurrent modifications, necessary locking are still
>>>> required
>>>>>>> to
>>>>>>>> ensure the integrity of the query result.
>>>>>>>> 
>>>>>>>> Please let me know if you disagree with the above assumptions. If
>> you
>>>>>>> agree
>>>>>>>> with these assumptions, with the `CacheHandle cache()` API in my
>> last
>>>>>>>> email, do you still see side effects?
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> 
>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Wed, Dec 12, 2018 at 7:11 PM Piotr Nowojski <
>>>> pi...@data-artisans.com
>>>>>>>> 
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi Becket,
>>>>>>>>> 
>>>>>>>>>> Regarding the chance of optimization, it might not be that rare.
>>>> Some
>>>>>>>>> very
>>>>>>>>>> simple statistics could already help in many cases. For example,
>>>>>>> simply
>>>>>>>>>> maintaining max and min of each fields can already eliminate some
>>>>>>>>>> unnecessary table scan (potentially scanning the cached table) if
>>>> the
>>>>>>>>>> result is doomed to be empty. A histogram would give even further
>>>>>>>>>> information. The optimizer could be very careful and only ignores
>>>>>>> cache
>>>>>>>>>> when it is 100% sure doing that is cheaper. e.g. only when a
>> filter
>>>> on
>>>>>>>>> the
>>>>>>>>>> cache will absolutely return nothing.
>>>>>>>>> 
>>>>>>>>> I do not see how this might be easy to achieve. It would require
>> tons
>>>>>>> of
>>>>>>>>> effort to make it work and in the end you would still have a
>> problem
>>>> of
>>>>>>>>> comparing/trading CPU cycles vs IO. For example:
>>>>>>>>> 
>>>>>>>>> Table src1 = … // read from connector 1
>>>>>>>>> Table src2 = … // read from connector 2
>>>>>>>>> 
>>>>>>>>> Table a = src1.filter(…).join(src2.filter(…), …)
>>>>>>>>> a.cache() // write cache to connector 3
>>>>>>>>> 
>>>>>>>>> a.filter(…)
>>>>>>>>> env.execute()
>>>>>>>>> a.select(…)
>>>>>>>>> 
>>>>>>>>> Decision whether it’s better to:
>>>>>>>>> A) read from connector1/connector2, filter/map and join them twice
>>>>>>>>> B) read from connector1/connector2, filter/map and join them once,
>>>> pay
>>>>>>> the
>>>>>>>>> price of writing to connector 3 and then reading from it
>>>>>>>>> 
>>>>>>>>> Is very far from trivial. `a` can end up much larger than `src1`
>> and
>>>>>>>>> `src2`, writes to connector 3 might be extremely slow, reads from
>>>>>>> connector
>>>>>>>>> 3 can be slower compared to reads from connector 1 & 2, … . You
>>>> really
>>>>>>> need
>>>>>>>>> to have extremely good statistics to correctly asses size of the
>>>>>>> output and
>>>>>>>>> it would still be failing many times (correlations etc). And keep
>> in
>>>>>>> mind
>>>>>>>>> that at the moment we do not have ANY statistics at all. More than
>>>>>>> that, it
>>>>>>>>> would require significantly more testing and setting up some
>>>>>>> benchmarks to
>>>>>>>>> make sure that we do not brake it with some regressions.
>>>>>>>>> 
>>>>>>>>> That’s why I’m strongly opposing this idea - at least let’s not
>>>> starts
>>>>>>>>> with this. If we first start with completely manual/explicit
>> caching,
>>>>>>>>> without any magic, it would be a significant improvement for the
>>>> users
>>>>>>> for
>>>>>>>>> a fraction of the development cost. After implementing that, when
>> we
>>>>>>>>> already have all of the working pieces, we can start working on
>> some
>>>>>>>>> optimisations rules. As I wrote before, if we start with
>>>>>>>>> 
>>>>>>>>> `CachedTable cache()`
>>>>>>>>> 
>>>>>>>>> We can later work on follow up stories to make it automatic.
>> Despite
>>>>>>> that
>>>>>>>>> I don’t like this implicit/side effect approach with `void` method,
>>>>>>> having
>>>>>>>>> explicit `CachedTable cache()` wouldn’t even prevent as from later
>>>>>>> adding
>>>>>>>>> `void hintCache()` method, with the exact semantic that you want.
>>>>>>>>> 
>>>>>>>>> On top of that I re-rise again that having implicit `void
>>>>>>>>> cache()/hintCache()` has other side effects and problems with non
>>>>>>> immutable
>>>>>>>>> data, and being annoying when used secretly inside methods.
>>>>>>>>> 
>>>>>>>>> Explicit `CachedTable cache()` just looks like much less
>>>> controversial
>>>>>>> MVP
>>>>>>>>> and if we decide to go further with this topic, it’s not a wasted
>>>>>>> effort,
>>>>>>>>> but just lies on a stright path to more advanced/complicated
>>>> solutions
>>>>>>> in
>>>>>>>>> the future. Are there any drawbacks of starting with `CachedTable
>>>>>>> cache()`
>>>>>>>>> that I’m missing?
>>>>>>>>> 
>>>>>>>>> Piotrek
>>>>>>>>> 
>>>>>>>>>> On 12 Dec 2018, at 09:30, Jeff Zhang <zjf...@gmail.com> wrote:
>>>>>>>>>> 
>>>>>>>>>> Hi Becket,
>>>>>>>>>> 
>>>>>>>>>> Introducing CacheHandle seems too complicated. That means users
>> have
>>>>>>> to
>>>>>>>>>> maintain Handler properly.
>>>>>>>>>> 
>>>>>>>>>> And since cache is just a hint for optimizer, why not just return
>>>>>>> Table
>>>>>>>>>> itself for cache method. This hint info should be kept in Table I
>>>>>>>>> believe.
>>>>>>>>>> 
>>>>>>>>>> So how about adding method cache and uncache for Table, and both
>>>>>>> return
>>>>>>>>>> Table. Because what cache and uncache did is just adding some hint
>>>>>>> info
>>>>>>>>>> into Table.
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> Becket Qin <becket....@gmail.com> 于2018年12月12日周三 上午11:25写道:
>>>>>>>>>> 
>>>>>>>>>>> Hi Till and Piotrek,
>>>>>>>>>>> 
>>>>>>>>>>> Thanks for the clarification. That solves quite a few confusion.
>> My
>>>>>>>>>>> understanding of how cache works is same as what Till describe.
>>>> i.e.
>>>>>>>>>>> cache() is a hint to Flink, but it is not guaranteed that cache
>>>>>>> always
>>>>>>>>>>> exist and it might be recomputed from its lineage.
>>>>>>>>>>> 
>>>>>>>>>>> Is this the core of our disagreement here? That you would like
>> this
>>>>>>>>>>>> “cache()” to be mostly hint for the optimiser?
>>>>>>>>>>> 
>>>>>>>>>>> Semantic wise, yes. That's also why I think materialize() has a
>>>> much
>>>>>>>>> larger
>>>>>>>>>>> scope than cache(), thus it should be a different method.
>>>>>>>>>>> 
>>>>>>>>>>> Regarding the chance of optimization, it might not be that rare.
>>>> Some
>>>>>>>>> very
>>>>>>>>>>> simple statistics could already help in many cases. For example,
>>>>>>> simply
>>>>>>>>>>> maintaining max and min of each fields can already eliminate some
>>>>>>>>>>> unnecessary table scan (potentially scanning the cached table) if
>>>> the
>>>>>>>>>>> result is doomed to be empty. A histogram would give even further
>>>>>>>>>>> information. The optimizer could be very careful and only ignores
>>>>>>> cache
>>>>>>>>>>> when it is 100% sure doing that is cheaper. e.g. only when a
>> filter
>>>>>>> on
>>>>>>>>> the
>>>>>>>>>>> cache will absolutely return nothing.
>>>>>>>>>>> 
>>>>>>>>>>> Given the above clarification on cache, I would like to revisit
>> the
>>>>>>>>>>> original "void cache()" proposal and see if we can improve on top
>>>> of
>>>>>>>>> that.
>>>>>>>>>>> 
>>>>>>>>>>> What do you think about the following modified interface?
>>>>>>>>>>> 
>>>>>>>>>>> Table {
>>>>>>>>>>> /**
>>>>>>>>>>> * This call hints Flink to maintain a cache of this table and
>>>>>>> leverage
>>>>>>>>>>> it for performance optimization if needed.
>>>>>>>>>>> * Note that Flink may still decide to not use the cache if it is
>>>>>>>>> cheaper
>>>>>>>>>>> by doing so.
>>>>>>>>>>> *
>>>>>>>>>>> * A CacheHandle will be returned to allow user release the cache
>>>>>>>>>>> actively. The cache will be deleted if there
>>>>>>>>>>> * is no unreleased cache handlers to it. When the
>> TableEnvironment
>>>>>>> is
>>>>>>>>>>> closed. The cache will also be deleted
>>>>>>>>>>> * and all the cache handlers will be released.
>>>>>>>>>>> *
>>>>>>>>>>> * @return a CacheHandle referring to the cache of this table.
>>>>>>>>>>> */
>>>>>>>>>>> CacheHandle cache();
>>>>>>>>>>> }
>>>>>>>>>>> 
>>>>>>>>>>> CacheHandle {
>>>>>>>>>>> /**
>>>>>>>>>>> * Close the cache handle. This method does not necessarily
>> deletes
>>>>>>> the
>>>>>>>>>>> cache. Instead, it simply decrements the reference counter to the
>>>>>>> cache.
>>>>>>>>>>> * When the there is no handle referring to a cache. The cache
>> will
>>>>>>> be
>>>>>>>>>>> deleted.
>>>>>>>>>>> *
>>>>>>>>>>> * @return the number of open handles to the cache after this
>> handle
>>>>>>>>> has
>>>>>>>>>>> been released.
>>>>>>>>>>> */
>>>>>>>>>>> int release()
>>>>>>>>>>> }
>>>>>>>>>>> 
>>>>>>>>>>> The rationale behind this interface is following:
>>>>>>>>>>> In vast majority of the cases, users wouldn't really care whether
>>>> the
>>>>>>>>> cache
>>>>>>>>>>> is used or not. So I think the most intuitive way is letting
>>>> cache()
>>>>>>>>> return
>>>>>>>>>>> nothing. So nobody needs to worry about the difference between
>>>>>>>>> operations
>>>>>>>>>>> on CacheTables and those on the "original" tables. This will make
>>>>>>> maybe
>>>>>>>>>>> 99.9% of the users happy. There were two concerns raised for this
>>>>>>>>> approach:
>>>>>>>>>>> 1. In some rare cases, users may want to ignore cache,
>>>>>>>>>>> 2. A table might be cached/uncached in a third party function
>> while
>>>>>>> the
>>>>>>>>>>> caller does not know.
>>>>>>>>>>> 
>>>>>>>>>>> For the first issue, users can use hint("ignoreCache") to
>>>> explicitly
>>>>>>>>> ignore
>>>>>>>>>>> cache.
>>>>>>>>>>> For the second issue, the above proposal lets cache() return a
>>>>>>>>> CacheHandle,
>>>>>>>>>>> the only method in it is release(). Different CacheHandles will
>>>>>>> refer to
>>>>>>>>>>> the same cache, if a cache no longer has any cache handle, it
>> will
>>>> be
>>>>>>>>>>> deleted. This will address the following case:
>>>>>>>>>>> {
>>>>>>>>>>> val handle1 = a.cache()
>>>>>>>>>>> process(a)
>>>>>>>>>>> a.select(...) // cache is still available, handle1 has not been
>>>>>>>>> released.
>>>>>>>>>>> }
>>>>>>>>>>> 
>>>>>>>>>>> void process(Table t) {
>>>>>>>>>>> val handle2 = t.cache() // new handle to cache
>>>>>>>>>>> t.select(...) // optimizer decides cache usage
>>>>>>>>>>> t.hint("ignoreCache").select(...) // cache is ignored
>>>>>>>>>>> handle2.release() // release the handle, but the cache may still
>> be
>>>>>>>>>>> available if there are other handles
>>>>>>>>>>> ...
>>>>>>>>>>> }
>>>>>>>>>>> 
>>>>>>>>>>> Does the above modified approach look reasonable to you?
>>>>>>>>>>> 
>>>>>>>>>>> Cheers,
>>>>>>>>>>> 
>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Tue, Dec 11, 2018 at 6:44 PM Till Rohrmann <
>>>> trohrm...@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Hi Becket,
>>>>>>>>>>>> 
>>>>>>>>>>>> I was aiming at semantics similar to 1. I actually thought that
>>>>>>>>> `cache()`
>>>>>>>>>>>> would tell the system to materialize the intermediate result so
>>>> that
>>>>>>>>>>>> subsequent queries don't need to reprocess it. This means that
>> the
>>>>>>>>> usage
>>>>>>>>>>> of
>>>>>>>>>>>> the cached table in this example
>>>>>>>>>>>> 
>>>>>>>>>>>> {
>>>>>>>>>>>> val cachedTable = a.cache()
>>>>>>>>>>>> val b1 = cachedTable.select(…)
>>>>>>>>>>>> val b2 = cachedTable.foo().select(…)
>>>>>>>>>>>> val b3 = cachedTable.bar().select(...)
>>>>>>>>>>>> val c1 = a.select(…)
>>>>>>>>>>>> val c2 = a.foo().select(…)
>>>>>>>>>>>> val c3 = a.bar().select(...)
>>>>>>>>>>>> }
>>>>>>>>>>>> 
>>>>>>>>>>>> strongly depends on interleaved calls which trigger the
>> execution
>>>> of
>>>>>>>>> sub
>>>>>>>>>>>> queries. So for example, if there is only a single env.execute
>>>> call
>>>>>>> at
>>>>>>>>>>> the
>>>>>>>>>>>> end of  block, then b1, b2, b3, c1, c2 and c3 would all be
>>>> computed
>>>>>>> by
>>>>>>>>>>>> reading directly from the sources (given that there is only a
>>>> single
>>>>>>>>>>>> JobGraph). It just happens that the result of `a` will be cached
>>>>>>> such
>>>>>>>>>>> that
>>>>>>>>>>>> we skip the processing of `a` when there are subsequent queries
>>>>>>> reading
>>>>>>>>>>>> from `cachedTable`. If for some reason the system cannot
>>>> materialize
>>>>>>>>> the
>>>>>>>>>>>> table (e.g. running out of disk space, ttl expired), then it
>> could
>>>>>>> also
>>>>>>>>>>>> happen that we need to reprocess `a`. In that sense
>> `cachedTable`
>>>>>>>>> simply
>>>>>>>>>>> is
>>>>>>>>>>>> an identifier for the materialized result of `a` with the
>> lineage
>>>>>>> how
>>>>>>>>> to
>>>>>>>>>>>> reprocess it.
>>>>>>>>>>>> 
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Till
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On Tue, Dec 11, 2018 at 11:01 AM Piotr Nowojski <
>>>>>>>>> pi...@data-artisans.com
>>>>>>>>>>>> 
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi Becket,
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> {
>>>>>>>>>>>>>> val cachedTable = a.cache()
>>>>>>>>>>>>>> val b = cachedTable.select(...)
>>>>>>>>>>>>>> val c = a.select(...)
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Semantic 1. b uses cachedTable as user demanded so. c uses
>>>>>>> original
>>>>>>>>>>> DAG
>>>>>>>>>>>>> as
>>>>>>>>>>>>>> user demanded so. In this case, the optimizer has no chance to
>>>>>>>>>>>> optimize.
>>>>>>>>>>>>>> Semantic 2. b uses cachedTable as user demanded so. c leaves
>> the
>>>>>>>>>>>>> optimizer
>>>>>>>>>>>>>> to choose whether the cache or DAG should be used. In this
>> case,
>>>>>>> user
>>>>>>>>>>>>> lose
>>>>>>>>>>>>>> the option to NOT use cache.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> As you can see, neither of the options seem perfect. However,
>> I
>>>>>>> guess
>>>>>>>>>>>> you
>>>>>>>>>>>>>> and Till are proposing the third option:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Semantic 3. b leaves the optimizer to choose whether cache or
>>>> DAG
>>>>>>>>>>>> should
>>>>>>>>>>>>> be
>>>>>>>>>>>>>> used. c always use the DAG.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I am pretty sure that me, Till, Fabian and others were all
>>>>>>> proposing
>>>>>>>>>>> and
>>>>>>>>>>>>> advocating in favour of semantic “1”. No cost based optimiser
>>>>>>>>> decisions
>>>>>>>>>>>> at
>>>>>>>>>>>>> all.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> {
>>>>>>>>>>>>> val cachedTable = a.cache()
>>>>>>>>>>>>> val b1 = cachedTable.select(…)
>>>>>>>>>>>>> val b2 = cachedTable.foo().select(…)
>>>>>>>>>>>>> val b3 = cachedTable.bar().select(...)
>>>>>>>>>>>>> val c1 = a.select(…)
>>>>>>>>>>>>> val c2 = a.foo().select(…)
>>>>>>>>>>>>> val c3 = a.bar().select(...)
>>>>>>>>>>>>> }
>>>>>>>>>>>>> 
>>>>>>>>>>>>> All b1, b2 and b3 are reading from cache, while c1, c2 and c3
>> are
>>>>>>>>>>>>> re-executing whole plan for “a”.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> In the future we could discuss going one step further,
>>>> introducing
>>>>>>>>> some
>>>>>>>>>>>>> global optimisation (that can be manually enabled/disabled):
>>>>>>>>>>> deduplicate
>>>>>>>>>>>>> plan nodes/deduplicate sub queries/re-use sub queries
>> results/or
>>>>>>>>>>> whatever
>>>>>>>>>>>>> we could call it. It could do two things:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 1. Automatically try to deduplicate fragments of the plan and
>>>> share
>>>>>>>>> the
>>>>>>>>>>>>> result using CachedTable - in other words automatically insert
>>>>>>>>>>>> `CachedTable
>>>>>>>>>>>>> cache()` calls.
>>>>>>>>>>>>> 2. Automatically make decision to bypass explicit `CachedTable`
>>>>>>> access
>>>>>>>>>>>>> (this would be the equivalent of what you described as
>> “semantic
>>>>>>> 3”).
>>>>>>>>>>>>> 
>>>>>>>>>>>>> However as I wrote previously, I have big doubts if such
>>>> cost-based
>>>>>>>>>>>>> optimisation would work (this applies also to “Semantic 2”). I
>>>>>>> would
>>>>>>>>>>>> expect
>>>>>>>>>>>>> it to do more harm than good in so many cases, that it wouldn’t
>>>>>>> make
>>>>>>>>>>>> sense.
>>>>>>>>>>>>> Even assuming that we calculate statistics perfectly (this
>> ain’t
>>>>>>> gonna
>>>>>>>>>>>>> happen), it’s virtually impossible to correctly estimate
>> correct
>>>>>>>>>>> exchange
>>>>>>>>>>>>> rate of CPU cycles vs IO operations as it is changing so much
>>>> from
>>>>>>>>>>>>> deployment to deployment.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Is this the core of our disagreement here? That you would like
>>>> this
>>>>>>>>>>>>> “cache()” to be mostly hint for the optimiser?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Piotrek
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On 11 Dec 2018, at 06:00, Becket Qin <becket....@gmail.com>
>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Another potential concern for semantic 3 is that. In the
>> future,
>>>>>>> we
>>>>>>>>>>> may
>>>>>>>>>>>>> add
>>>>>>>>>>>>>> automatic caching to Flink. e.g. cache the intermediate
>> results
>>>> at
>>>>>>>>>>> the
>>>>>>>>>>>>>> shuffle boundary. If our semantic is that reference to the
>>>>>>> original
>>>>>>>>>>>> table
>>>>>>>>>>>>>> means skipping cache, those users may not be able to benefit
>>>> from
>>>>>>> the
>>>>>>>>>>>>>> implicit cache.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Tue, Dec 11, 2018 at 12:10 PM Becket Qin <
>>>> becket....@gmail.com
>>>>>>>> 
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi Piotrek,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks for the reply. Thought about it again, I might have
>>>>>>>>>>>> misunderstood
>>>>>>>>>>>>>>> your proposal in earlier emails. Returning a CachedTable
>> might
>>>>>>> not
>>>>>>>>>>> be
>>>>>>>>>>>> a
>>>>>>>>>>>>> bad
>>>>>>>>>>>>>>> idea.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I was more concerned about the semantic and its intuitiveness
>>>>>>> when a
>>>>>>>>>>>>>>> CachedTable is returned. i..e, if cache() returns
>> CachedTable.
>>>>>>> What
>>>>>>>>>>>> are
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> semantic in the following code:
>>>>>>>>>>>>>>> {
>>>>>>>>>>>>>>> val cachedTable = a.cache()
>>>>>>>>>>>>>>> val b = cachedTable.select(...)
>>>>>>>>>>>>>>> val c = a.select(...)
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>> What is the difference between b and c? At the first glance,
>> I
>>>>>>> see
>>>>>>>>>>> two
>>>>>>>>>>>>>>> options:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Semantic 1. b uses cachedTable as user demanded so. c uses
>>>>>>> original
>>>>>>>>>>>> DAG
>>>>>>>>>>>>> as
>>>>>>>>>>>>>>> user demanded so. In this case, the optimizer has no chance
>> to
>>>>>>>>>>>> optimize.
>>>>>>>>>>>>>>> Semantic 2. b uses cachedTable as user demanded so. c leaves
>>>> the
>>>>>>>>>>>>> optimizer
>>>>>>>>>>>>>>> to choose whether the cache or DAG should be used. In this
>>>> case,
>>>>>>>>>>> user
>>>>>>>>>>>>> lose
>>>>>>>>>>>>>>> the option to NOT use cache.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> As you can see, neither of the options seem perfect.
>> However, I
>>>>>>>>>>> guess
>>>>>>>>>>>>> you
>>>>>>>>>>>>>>> and Till are proposing the third option:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Semantic 3. b leaves the optimizer to choose whether cache or
>>>> DAG
>>>>>>>>>>>> should
>>>>>>>>>>>>>>> be used. c always use the DAG.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> This does address all the concerns. It is just that from
>>>>>>>>>>> intuitiveness
>>>>>>>>>>>>>>> perspective, I found that asking user to explicitly use a
>>>>>>>>>>> CachedTable
>>>>>>>>>>>>> while
>>>>>>>>>>>>>>> the optimizer might choose to ignore is a little weird. That
>>>> was
>>>>>>>>>>> why I
>>>>>>>>>>>>> did
>>>>>>>>>>>>>>> not think about that semantic. But given there is material
>>>>>>> benefit,
>>>>>>>>>>> I
>>>>>>>>>>>>> think
>>>>>>>>>>>>>>> this semantic is acceptable.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 1. If we want to let optimiser make decisions whether to use
>>>>>>> cache
>>>>>>>>>>> or
>>>>>>>>>>>>> not,
>>>>>>>>>>>>>>>> then why do we need “void cache()” method at all? Would It
>>>>>>>>>>>> “increase”
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> chance of using the cache? That’s sounds strange. What would
>>>> be
>>>>>>> the
>>>>>>>>>>>>>>>> mechanism of deciding whether to use the cache or not? If we
>>>>>>> want
>>>>>>>>>>> to
>>>>>>>>>>>>>>>> introduce such kind  automated optimisations of “plan nodes
>>>>>>>>>>>>> deduplication”
>>>>>>>>>>>>>>>> I would turn it on globally, not per table, and let the
>>>>>>> optimiser
>>>>>>>>>>> do
>>>>>>>>>>>>> all of
>>>>>>>>>>>>>>>> the work.
>>>>>>>>>>>>>>>> 2. We do not have statistics at the moment for any use/not
>> use
>>>>>>>>>>> cache
>>>>>>>>>>>>>>>> decision.
>>>>>>>>>>>>>>>> 3. Even if we had, I would be veeerryy sceptical whether
>> such
>>>>>>> cost
>>>>>>>>>>>>> based
>>>>>>>>>>>>>>>> optimisations would work properly and I would still insist
>>>>>>> first on
>>>>>>>>>>>>>>>> providing explicit caching mechanism (`CachedTable cache()`)
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> We are absolutely on the same page here. An explicit cache()
>>>>>>> method
>>>>>>>>>>> is
>>>>>>>>>>>>>>> necessary not only because optimizer may not be able to make
>>>> the
>>>>>>>>>>> right
>>>>>>>>>>>>>>> decision, but also because of the nature of interactive
>>>>>>> programming.
>>>>>>>>>>>> For
>>>>>>>>>>>>>>> example, if users write the following code in Scala shell:
>>>>>>>>>>>>>>> val b = a.select(...)
>>>>>>>>>>>>>>> val c = b.select(...)
>>>>>>>>>>>>>>> val d = c.select(...).writeToSink(...)
>>>>>>>>>>>>>>> tEnv.execute()
>>>>>>>>>>>>>>> There is no way optimizer will know whether b or c will be
>> used
>>>>>>> in
>>>>>>>>>>>> later
>>>>>>>>>>>>>>> code, unless users hint explicitly.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> At the same time I’m not sure if you have responded to our
>>>>>>>>>>> objections
>>>>>>>>>>>> of
>>>>>>>>>>>>>>>> `void cache()` being implicit/having side effects, which me,
>>>>>>> Jark,
>>>>>>>>>>>>> Fabian,
>>>>>>>>>>>>>>>> Till and I think also Shaoxuan are supporting.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Is there any other side effects if we use semantic 3
>> mentioned
>>>>>>>>>>> above?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> JIangjie (Becket) Qin
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Mon, Dec 10, 2018 at 7:54 PM Piotr Nowojski <
>>>>>>>>>>>> pi...@data-artisans.com
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hi Becket,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Sorry for not responding long time.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Regarding case1.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> There wouldn’t be no “a.unCache()” method, but I would
>> expect
>>>>>>> only
>>>>>>>>>>>>>>>> `cachedTableA1.dropCache()`. Dropping `cachedTableA1`
>> wouldn’t
>>>>>>>>>>> affect
>>>>>>>>>>>>>>>> `cachedTableA2`. Just as in any other database dropping
>>>>>>> modifying
>>>>>>>>>>> one
>>>>>>>>>>>>>>>> independent table/materialised view does not affect others.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> What I meant is that assuming there is already a cached
>>>> table,
>>>>>>>>>>>> ideally
>>>>>>>>>>>>>>>> users need
>>>>>>>>>>>>>>>>> not to specify whether the next query should read from the
>>>>>>> cache
>>>>>>>>>>> or
>>>>>>>>>>>>> use
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> original DAG. This should be decided by the optimizer.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 1. If we want to let optimiser make decisions whether to use
>>>>>>> cache
>>>>>>>>>>> or
>>>>>>>>>>>>>>>> not, then why do we need “void cache()” method at all? Would
>>>> It
>>>>>>>>>>>>> “increase”
>>>>>>>>>>>>>>>> the chance of using the cache? That’s sounds strange. What
>>>>>>> would be
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> mechanism of deciding whether to use the cache or not? If we
>>>>>>> want
>>>>>>>>>>> to
>>>>>>>>>>>>>>>> introduce such kind  automated optimisations of “plan nodes
>>>>>>>>>>>>> deduplication”
>>>>>>>>>>>>>>>> I would turn it on globally, not per table, and let the
>>>>>>> optimiser
>>>>>>>>>>> do
>>>>>>>>>>>>> all of
>>>>>>>>>>>>>>>> the work.
>>>>>>>>>>>>>>>> 2. We do not have statistics at the moment for any use/not
>> use
>>>>>>>>>>> cache
>>>>>>>>>>>>>>>> decision.
>>>>>>>>>>>>>>>> 3. Even if we had, I would be veeerryy sceptical whether
>> such
>>>>>>> cost
>>>>>>>>>>>>> based
>>>>>>>>>>>>>>>> optimisations would work properly and I would still insist
>>>>>>> first on
>>>>>>>>>>>>>>>> providing explicit caching mechanism (`CachedTable cache()`)
>>>>>>>>>>>>>>>> 4. As Till wrote, having explicit `CachedTable cache()`
>>>> doesn’t
>>>>>>>>>>>>>>>> contradict future work on automated cost based caching.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> At the same time I’m not sure if you have responded to our
>>>>>>>>>>> objections
>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>> `void cache()` being implicit/having side effects, which me,
>>>>>>> Jark,
>>>>>>>>>>>>> Fabian,
>>>>>>>>>>>>>>>> Till and I think also Shaoxuan are supporting.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Piotrek
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On 5 Dec 2018, at 12:42, Becket Qin <becket....@gmail.com>
>>>>>>> wrote:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Hi Till,
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> It is true that after the first job submission, there will
>> be
>>>>>>> no
>>>>>>>>>>>>>>>> ambiguity
>>>>>>>>>>>>>>>>> in terms of whether a cached table is used or not. That is
>>>> the
>>>>>>>>>>> same
>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> cache() without returning a CachedTable.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Conceptually one could think of cache() as introducing a
>>>>>>> caching
>>>>>>>>>>>>>>>> operator
>>>>>>>>>>>>>>>>>> from which you need to consume from if you want to benefit
>>>>>>> from
>>>>>>>>>>> the
>>>>>>>>>>>>>>>> caching
>>>>>>>>>>>>>>>>>> functionality.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> I am thinking a little differently. I think it is a hint
>> (as
>>>>>>> you
>>>>>>>>>>>>>>>> mentioned
>>>>>>>>>>>>>>>>> later) instead of a new operator. I'd like to be careful
>>>> about
>>>>>>> the
>>>>>>>>>>>>>>>> semantic
>>>>>>>>>>>>>>>>> of the API. A hint is a property set on an existing
>> operator,
>>>>>>> but
>>>>>>>>>>> is
>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>> itself an operator as it does not really manipulate the
>> data.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> I agree, ideally the optimizer makes this kind of decision
>>>>>>> which
>>>>>>>>>>>>>>>>>> intermediate result should be cached. But especially when
>>>>>>>>>>> executing
>>>>>>>>>>>>>>>> ad-hoc
>>>>>>>>>>>>>>>>>> queries the user might better know which results need to
>> be
>>>>>>>>>>> cached
>>>>>>>>>>>>>>>> because
>>>>>>>>>>>>>>>>>> Flink might not see the full DAG. In that sense, I would
>>>>>>> consider
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> cache() method as a hint for the optimizer. Of course, in
>>>> the
>>>>>>>>>>>> future
>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>> might add functionality which tries to automatically cache
>>>>>>>>>>> results
>>>>>>>>>>>>>>>> (e.g.
>>>>>>>>>>>>>>>>>> caching the latest intermediate results until so and so
>> much
>>>>>>>>>>> space
>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>> used). But this should hopefully not contradict with
>>>>>>> `CachedTable
>>>>>>>>>>>>>>>> cache()`.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> I agree that cache() method is needed for exactly the
>> reason
>>>>>>> you
>>>>>>>>>>>>>>>> mentioned,
>>>>>>>>>>>>>>>>> i.e. Flink cannot predict what users are going to write
>>>> later,
>>>>>>> so
>>>>>>>>>>>>> users
>>>>>>>>>>>>>>>>> need to tell Flink explicitly that this table will be used
>>>>>>> later.
>>>>>>>>>>>>> What I
>>>>>>>>>>>>>>>>> meant is that assuming there is already a cached table,
>>>> ideally
>>>>>>>>>>>> users
>>>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>>> not to specify whether the next query should read from the
>>>>>>> cache
>>>>>>>>>>> or
>>>>>>>>>>>>> use
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> original DAG. This should be decided by the optimizer.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> To explain the difference between returning / not
>> returning a
>>>>>>>>>>>>>>>> CachedTable,
>>>>>>>>>>>>>>>>> I want compare the following two case:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> *Case 1:  returning a CachedTable*
>>>>>>>>>>>>>>>>> b = a.map(...)
>>>>>>>>>>>>>>>>> val cachedTableA1 = a.cache()
>>>>>>>>>>>>>>>>> val cachedTableA2 = a.cache()
>>>>>>>>>>>>>>>>> b.print() // Just to make sure a is cached.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> c = a.filter(...) // User specify that the original DAG is
>>>>>>> used?
>>>>>>>>>>> Or
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> optimizer decides whether DAG or cache should be used?
>>>>>>>>>>>>>>>>> d = cachedTableA1.filter() // User specify that the cached
>>>>>>> table
>>>>>>>>>>> is
>>>>>>>>>>>>>>>> used.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> a.unCache() // Can cachedTableA still be used afterwards?
>>>>>>>>>>>>>>>>> cachedTableA1.uncache() // Can cachedTableA2 still be used?
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> *Case 2: not returning a CachedTable*
>>>>>>>>>>>>>>>>> b = a.map()
>>>>>>>>>>>>>>>>> a.cache()
>>>>>>>>>>>>>>>>> a.cache() // no-op
>>>>>>>>>>>>>>>>> b.print() // Just to make sure a is cached
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> c = a.filter(...) // Optimizer decides whether the cache or
>>>> DAG
>>>>>>>>>>>> should
>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>> used
>>>>>>>>>>>>>>>>> d = a.filter(...) // Optimizer decides whether the cache or
>>>> DAG
>>>>>>>>>>>> should
>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>> used
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> a.unCache()
>>>>>>>>>>>>>>>>> a.unCache() // no-op
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> In case 1, semantic wise, optimizer lose the option to
>> choose
>>>>>>>>>>>> between
>>>>>>>>>>>>>>>> DAG
>>>>>>>>>>>>>>>>> and cache. And the unCache() call becomes tricky.
>>>>>>>>>>>>>>>>> In case 2, users do not need to worry about whether cache
>> or
>>>>>>> DAG
>>>>>>>>>>> is
>>>>>>>>>>>>>>>> used.
>>>>>>>>>>>>>>>>> And the unCache() semantic is clear. However, the caveat is
>>>>>>> that
>>>>>>>>>>>> users
>>>>>>>>>>>>>>>>> cannot explicitly ignore the cache.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> In order to address the issues mentioned in case 2 and
>>>>>>> inspired by
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> discussion so far, I am thinking about using hint to allow
>>>> user
>>>>>>>>>>>>>>>> explicitly
>>>>>>>>>>>>>>>>> ignore cache. Although we do not have hint yet, but we
>>>> probably
>>>>>>>>>>>> should
>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>> one. So the code becomes:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> *Case 3: returning this table*
>>>>>>>>>>>>>>>>> b = a.map()
>>>>>>>>>>>>>>>>> a.cache()
>>>>>>>>>>>>>>>>> a.cache() // no-op
>>>>>>>>>>>>>>>>> b.print() // Just to make sure a is cached
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> c = a.filter(...) // Optimizer decides whether the cache or
>>>> DAG
>>>>>>>>>>>> should
>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>> used
>>>>>>>>>>>>>>>>> d = a.hint("ignoreCache").filter(...) // DAG will be used
>>>>>>> instead
>>>>>>>>>>> of
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> cache.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> a.unCache()
>>>>>>>>>>>>>>>>> a.unCache() // no-op
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> We could also let cache() return this table to allow
>> chained
>>>>>>>>>>> method
>>>>>>>>>>>>>>>> calls.
>>>>>>>>>>>>>>>>> Do you think this API addresses the concerns?
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On Wed, Dec 5, 2018 at 10:55 AM Jark Wu <imj...@gmail.com>
>>>>>>> wrote:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> All the recent discussions are focused on whether there
>> is a
>>>>>>>>>>>> problem
>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>> cache() not return a Table.
>>>>>>>>>>>>>>>>>> It seems that returning a Table explicitly is more clear
>>>> (and
>>>>>>>>>>>> safe?).
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> So whether there are any problems if cache() returns a
>>>> Table?
>>>>>>>>>>>>> @Becket
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>> Jark
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> On Tue, 4 Dec 2018 at 22:27, Till Rohrmann <
>>>>>>> trohrm...@apache.org
>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> It's true that b, c, d and e will all read from the
>>>> original
>>>>>>> DAG
>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>> generates a. But all subsequent operators (when running
>>>>>>> multiple
>>>>>>>>>>>>>>>> queries)
>>>>>>>>>>>>>>>>>>> which reference cachedTableA should not need to reproduce
>>>> `a`
>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>> directly
>>>>>>>>>>>>>>>>>>> consume the intermediate result.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Conceptually one could think of cache() as introducing a
>>>>>>> caching
>>>>>>>>>>>>>>>> operator
>>>>>>>>>>>>>>>>>>> from which you need to consume from if you want to
>> benefit
>>>>>>> from
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> caching
>>>>>>>>>>>>>>>>>>> functionality.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> I agree, ideally the optimizer makes this kind of
>> decision
>>>>>>> which
>>>>>>>>>>>>>>>>>>> intermediate result should be cached. But especially when
>>>>>>>>>>>> executing
>>>>>>>>>>>>>>>>>> ad-hoc
>>>>>>>>>>>>>>>>>>> queries the user might better know which results need to
>> be
>>>>>>>>>>> cached
>>>>>>>>>>>>>>>>>> because
>>>>>>>>>>>>>>>>>>> Flink might not see the full DAG. In that sense, I would
>>>>>>>>>>> consider
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> cache() method as a hint for the optimizer. Of course, in
>>>> the
>>>>>>>>>>>> future
>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>> might add functionality which tries to automatically
>> cache
>>>>>>>>>>> results
>>>>>>>>>>>>>>>> (e.g.
>>>>>>>>>>>>>>>>>>> caching the latest intermediate results until so and so
>>>> much
>>>>>>>>>>> space
>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>> used). But this should hopefully not contradict with
>>>>>>>>>>> `CachedTable
>>>>>>>>>>>>>>>>>> cache()`.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>> Till
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> On Tue, Dec 4, 2018 at 2:33 PM Becket Qin <
>>>>>>> becket....@gmail.com
>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Hi Till,
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Thanks for the clarification. I am still a little
>>>> confused.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> If cache() returns a CachedTable, the example might
>>>> become:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> b = a.map(...)
>>>>>>>>>>>>>>>>>>>> c = a.map(...)
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> cachedTableA = a.cache()
>>>>>>>>>>>>>>>>>>>> d = cachedTableA.map(...)
>>>>>>>>>>>>>>>>>>>> e = a.map()
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> In the above case, if cache() is lazily evaluated, b,
>> c, d
>>>>>>> and
>>>>>>>>>>> e
>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>> going to be reading from the original DAG that generates
>>>> a.
>>>>>>> But
>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>> naive expectation, d should be reading from the cache.
>>>> This
>>>>>>>>>>> seems
>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>> solving the potential confusion you raised, right?
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Just to be clear, my understanding are all based on the
>>>>>>>>>>>> assumption
>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> tables are immutable. Therefore, after a.cache(), a the
>>>>>>>>>>>>>>>> c*achedTableA*
>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> original table *a * should be completely
>> interchangeable.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> That said, I think a valid argument is optimization.
>> There
>>>>>>> are
>>>>>>>>>>>>> indeed
>>>>>>>>>>>>>>>>>>> cases
>>>>>>>>>>>>>>>>>>>> that reading from the original DAG could be faster than
>>>>>>> reading
>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> cache. For example, in the following example:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> a.filter(f1' > 100)
>>>>>>>>>>>>>>>>>>>> a.cache()
>>>>>>>>>>>>>>>>>>>> b = a.filter(f1' < 100)
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Ideally the optimizer should be intelligent enough to
>>>> decide
>>>>>>>>>>>> which
>>>>>>>>>>>>>>>> way
>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>> faster, without user intervention. In this case, it will
>>>>>>>>>>> identify
>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>> b
>>>>>>>>>>>>>>>>>>>> would just be an empty table, thus skip reading from the
>>>>>>> cache
>>>>>>>>>>>>>>>>>>> completely.
>>>>>>>>>>>>>>>>>>>> But I agree that returning a CachedTable would give user
>>>> the
>>>>>>>>>>>>> control
>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>> when to use cache, even though I still feel that letting
>>>> the
>>>>>>>>>>>>>>>> optimizer
>>>>>>>>>>>>>>>>>>>> handle this is a better option in long run.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> On Tue, Dec 4, 2018 at 6:51 PM Till Rohrmann <
>>>>>>>>>>>> trohrm...@apache.org
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Yes you are right Becket that it still depends on the
>>>>>>> actual
>>>>>>>>>>>>>>>>>> execution
>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>> the job whether a consumer reads from a cached result
>> or
>>>>>>> not.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> My point was actually about the properties of a (cached
>>>> vs.
>>>>>>>>>>>>>>>>>> non-cached)
>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>> not about the execution. I would not make cache trigger
>>>> the
>>>>>>>>>>>>>>>> execution
>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>> the job because one loses some flexibility by eagerly
>>>>>>>>>>> triggering
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> execution.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> I tried to argue for an explicit CachedTable which is
>>>>>>> returned
>>>>>>>>>>>> by
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> cache() method like Piotr did in order to make the API
>>>> more
>>>>>>>>>>>>>>>> explicit.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>> Till
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 4:23 PM Becket Qin <
>>>>>>>>>>> becket....@gmail.com
>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Hi Till,
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> That is a good example. Just a minor correction, in
>> this
>>>>>>>>>>> case,
>>>>>>>>>>>>> b, c
>>>>>>>>>>>>>>>>>>>> and d
>>>>>>>>>>>>>>>>>>>>>> will all consume from a non-cached a. This is because
>>>>>>> cache
>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>> only
>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>> created on the very first job submission that
>> generates
>>>>>>> the
>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>> cached.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> If I understand correctly, this is example is about
>>>>>>> whether
>>>>>>>>>>>>>>>>>> .cache()
>>>>>>>>>>>>>>>>>>>>> method
>>>>>>>>>>>>>>>>>>>>>> should be eagerly evaluated or lazily evaluated. In
>>>>>>> another
>>>>>>>>>>>> word,
>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>>> cache() method actually triggers a job that creates
>> the
>>>>>>>>>>> cache,
>>>>>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>>> be no such confusion. Is that right?
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> In the example, although d will not consume from the
>>>>>>> cached
>>>>>>>>>>>> Table
>>>>>>>>>>>>>>>>>>> while
>>>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>>> looks supposed to, from correctness perspective the
>> code
>>>>>>> will
>>>>>>>>>>>>> still
>>>>>>>>>>>>>>>>>>>>> return
>>>>>>>>>>>>>>>>>>>>>> correct result, assuming that tables are immutable.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Personally I feel it is OK because users probably
>> won't
>>>>>>>>>>> really
>>>>>>>>>>>>>>>>>> worry
>>>>>>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>>>>> whether the table is cached or not. And lazy cache
>> could
>>>>>>>>>>> avoid
>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>>> unnecessary caching if a cached table is never created
>>>> in
>>>>>>> the
>>>>>>>>>>>>> user
>>>>>>>>>>>>>>>>>>>>>> application. But I am not opposed to do eager
>> evaluation
>>>>>>> of
>>>>>>>>>>>>> cache.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 10:01 PM Till Rohrmann <
>>>>>>>>>>>>>>>>>> trohrm...@apache.org>
>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Another argument for Piotr's point is that lazily
>>>>>>> changing
>>>>>>>>>>>>>>>>>>> properties
>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>> node affects all down stream consumers but does not
>>>>>>>>>>>> necessarily
>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>> happen before these consumers are defined. From a
>>>> user's
>>>>>>>>>>>>>>>>>>> perspective
>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>> can be quite confusing:
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> b = a.map(...)
>>>>>>>>>>>>>>>>>>>>>>> c = a.map(...)
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> a.cache()
>>>>>>>>>>>>>>>>>>>>>>> d = a.map(...)
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> now b, c and d will consume from a cached operator.
>> In
>>>>>>> this
>>>>>>>>>>>>> case,
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> user
>>>>>>>>>>>>>>>>>>>>>>> would most likely expect that only d reads from a
>>>> cached
>>>>>>>>>>>> result.
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>> Till
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 11:32 AM Piotr Nowojski <
>>>>>>>>>>>>>>>>>>>>> pi...@data-artisans.com>
>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Hey Shaoxuan and Becket,
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Can you explain a bit more one what are the side
>>>>>>> effects?
>>>>>>>>>>> So
>>>>>>>>>>>>>>>>>>> far
>>>>>>>>>>>>>>>>>>>> my
>>>>>>>>>>>>>>>>>>>>>>>>> understanding is that such side effects only exist
>>>> if a
>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>> mutable.
>>>>>>>>>>>>>>>>>>>>>>>>> Is that the case?
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Not only that. There are also performance
>> implications
>>>>>>> and
>>>>>>>>>>>>>>>>>> those
>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>> another implicit side effects of using `void
>> cache()`.
>>>>>>> As I
>>>>>>>>>>>>>>>>>> wrote
>>>>>>>>>>>>>>>>>>>>>> before,
>>>>>>>>>>>>>>>>>>>>>>>> reading from cache might not always be desirable,
>> thus
>>>>>>> it
>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>> cause
>>>>>>>>>>>>>>>>>>>>>>>> performance degradation and I’m fine with that -
>>>> user's
>>>>>>> or
>>>>>>>>>>>>>>>>>>>>> optimiser’s
>>>>>>>>>>>>>>>>>>>>>>>> choice. What I do not like is that this implicit
>> side
>>>>>>>>>>> effect
>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>> manifest
>>>>>>>>>>>>>>>>>>>>>>>> in completely different part of code, that wasn’t
>>>>>>> touched
>>>>>>>>>>> by
>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>> user
>>>>>>>>>>>>>>>>>>>>>> while
>>>>>>>>>>>>>>>>>>>>>>>> he was adding `void cache()` call somewhere else.
>> And
>>>>>>> even
>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>> caching
>>>>>>>>>>>>>>>>>>>>>>>> improves performance, it’s still a side effect of
>>>> `void
>>>>>>>>>>>>>>>>>> cache()`.
>>>>>>>>>>>>>>>>>>>>>> Almost
>>>>>>>>>>>>>>>>>>>>>>>> from the definition `void` methods have only side
>>>>>>> effects.
>>>>>>>>>>>> As I
>>>>>>>>>>>>>>>>>>>> wrote
>>>>>>>>>>>>>>>>>>>>>>>> before, there are couple of scenarios where this
>> might
>>>>>>> be
>>>>>>>>>>>>>>>>>>>> undesirable
>>>>>>>>>>>>>>>>>>>>>>>> and/or unexpected, for example:
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 1.
>>>>>>>>>>>>>>>>>>>>>>>> Table b = …;
>>>>>>>>>>>>>>>>>>>>>>>> b.cache()
>>>>>>>>>>>>>>>>>>>>>>>> x = b.join(…)
>>>>>>>>>>>>>>>>>>>>>>>> y = b.count()
>>>>>>>>>>>>>>>>>>>>>>>> // ...
>>>>>>>>>>>>>>>>>>>>>>>> // 100
>>>>>>>>>>>>>>>>>>>>>>>> // hundred
>>>>>>>>>>>>>>>>>>>>>>>> // lines
>>>>>>>>>>>>>>>>>>>>>>>> // of
>>>>>>>>>>>>>>>>>>>>>>>> // code
>>>>>>>>>>>>>>>>>>>>>>>> // later
>>>>>>>>>>>>>>>>>>>>>>>> z = b.filter(…).groupBy(…) // this might be even
>>>> hidden
>>>>>>> in
>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>> different
>>>>>>>>>>>>>>>>>>>>>>>> method/file/package/dependency
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 2.
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Table b = ...
>>>>>>>>>>>>>>>>>>>>>>>> If (some_condition) {
>>>>>>>>>>>>>>>>>>>>>>>> foo(b)
>>>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>>>> Else {
>>>>>>>>>>>>>>>>>>>>>>>> bar(b)
>>>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>>>> z = b.filter(…).groupBy(…)
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Void foo(Table b) {
>>>>>>>>>>>>>>>>>>>>>>>> b.cache()
>>>>>>>>>>>>>>>>>>>>>>>> // do something with b
>>>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> In both above examples, `b.cache()` will implicitly
>>>>>>> affect
>>>>>>>>>>>>>>>>>>>> (semantic
>>>>>>>>>>>>>>>>>>>>>> of a
>>>>>>>>>>>>>>>>>>>>>>>> program in case of sources being mutable and
>>>>>>> performance)
>>>>>>>>>>> `z
>>>>>>>>>>>> =
>>>>>>>>>>>>>>>>>>>>>>>> b.filter(…).groupBy(…)` which might be far from
>>>> obvious.
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> On top of that, there is still this argument of mine
>>>>>>> that
>>>>>>>>>>>>>>>>>> having
>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>> `MaterializedTable` or `CachedTable` handle is more
>>>>>>>>>>> flexible
>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>> us
>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> future and for the user (as a manual option to
>> bypass
>>>>>>> cache
>>>>>>>>>>>>>>>>>>> reads).
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> But Jiangjie is correct,
>>>>>>>>>>>>>>>>>>>>>>>>> the source table in batching should be immutable.
>> It
>>>> is
>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> user’s
>>>>>>>>>>>>>>>>>>>>>>>>> responsibility to ensure it, otherwise even a
>> regular
>>>>>>>>>>>>>>>>>> failover
>>>>>>>>>>>>>>>>>>>> may
>>>>>>>>>>>>>>>>>>>>>> lead
>>>>>>>>>>>>>>>>>>>>>>>>> to inconsistent results.
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Yes, I agree that’s what perfect world/good
>> deployment
>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>> be.
>>>>>>>>>>>>>>>>>>>> But
>>>>>>>>>>>>>>>>>>>>>> its
>>>>>>>>>>>>>>>>>>>>>>>> often isn’t and while I’m not trying to fix this
>>>> (since
>>>>>>> the
>>>>>>>>>>>>>>>>>>> proper
>>>>>>>>>>>>>>>>>>>>> fix
>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>> to support transactions), I’m just trying to
>> minimise
>>>>>>>>>>>> confusion
>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> users that are not fully aware what’s going on and
>>>>>>> operate
>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>> less
>>>>>>>>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>>>>>>>>> perfect setup. And if something bites them after
>>>> adding
>>>>>>>>>>>>>>>>>>> `b.cache()`
>>>>>>>>>>>>>>>>>>>>>> call,
>>>>>>>>>>>>>>>>>>>>>>>> to make sure that they at least know all of the
>> places
>>>>>>> that
>>>>>>>>>>>>>>>>>>> adding
>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>> line can affect.
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Thanks, Piotrek
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> On 1 Dec 2018, at 15:39, Becket Qin <
>>>>>>> becket....@gmail.com
>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Hi Piotrek,
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Thanks again for the clarification. Some more
>> replies
>>>>>>> are
>>>>>>>>>>>>>>>>>>>>> following.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> But keep in mind that `.cache()` will/might not
>> only
>>>> be
>>>>>>>>>>> used
>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>> interactive
>>>>>>>>>>>>>>>>>>>>>>>>>> programming and not only in batching.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> It is true. Actually in stream processing, cache()
>>>> has
>>>>>>> the
>>>>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>>>>>>> semantic
>>>>>>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>>>> batch processing. The semantic is following:
>>>>>>>>>>>>>>>>>>>>>>>>> For a table created via a series of computation,
>> save
>>>>>>> that
>>>>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>> later
>>>>>>>>>>>>>>>>>>>>>>>>> reference to avoid running the computation logic to
>>>>>>>>>>>>>>>>>> regenerate
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> table.
>>>>>>>>>>>>>>>>>>>>>>>>> Once the application exits, drop all the cache.
>>>>>>>>>>>>>>>>>>>>>>>>> This semantic is same for both batch and stream
>>>>>>>>>>> processing.
>>>>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>>>>>>>> difference
>>>>>>>>>>>>>>>>>>>>>>>>> is that stream applications will only run once as
>>>> they
>>>>>>> are
>>>>>>>>>>>>>>>>>> long
>>>>>>>>>>>>>>>>>>>>>>> running.
>>>>>>>>>>>>>>>>>>>>>>>>> And the batch applications may be run multiple
>> times,
>>>>>>>>>>> hence
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> cache
>>>>>>>>>>>>>>>>>>>>>>> may
>>>>>>>>>>>>>>>>>>>>>>>>> be created and dropped each time the application
>>>> runs.
>>>>>>>>>>>>>>>>>>>>>>>>> Admittedly, there will probably be some resource
>>>>>>>>>>> management
>>>>>>>>>>>>>>>>>>>>>>> requirements
>>>>>>>>>>>>>>>>>>>>>>>>> for the streaming cached table, such as time based
>> /
>>>>>>> size
>>>>>>>>>>>>>>>>>> based
>>>>>>>>>>>>>>>>>>>>>>>> retention,
>>>>>>>>>>>>>>>>>>>>>>>>> to address the infinite data issue. But such
>>>>>>> requirement
>>>>>>>>>>>> does
>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>> change
>>>>>>>>>>>>>>>>>>>>>>>>> the semantic.
>>>>>>>>>>>>>>>>>>>>>>>>> You are right that interactive programming is just
>>>> one
>>>>>>> use
>>>>>>>>>>>>>>>>>> case
>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>> cache().
>>>>>>>>>>>>>>>>>>>>>>>>> It is not the only use case.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> For me the more important issue is of not having
>> the
>>>>>>> `void
>>>>>>>>>>>>>>>>>>>> cache()`
>>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>>> side effects.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> This is indeed the key point. The argument around
>>>>>>> whether
>>>>>>>>>>>>>>>>>>> cache()
>>>>>>>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>>>>>>> return something already indicates that cache() and
>>>>>>>>>>>>>>>>>>> materialize()
>>>>>>>>>>>>>>>>>>>>>>> address
>>>>>>>>>>>>>>>>>>>>>>>>> different issues.
>>>>>>>>>>>>>>>>>>>>>>>>> Can you explain a bit more one what are the side
>>>>>>> effects?
>>>>>>>>>>> So
>>>>>>>>>>>>>>>>>>> far
>>>>>>>>>>>>>>>>>>>> my
>>>>>>>>>>>>>>>>>>>>>>>>> understanding is that such side effects only exist
>>>> if a
>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>> mutable.
>>>>>>>>>>>>>>>>>>>>>>>>> Is that the case?
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> I don’t know, probably initially we should make
>>>>>>>>>>> CachedTable
>>>>>>>>>>>>>>>>>>>>>> read-only.
>>>>>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>>>> don’t find it more confusing than the fact that
>> user
>>>>>>> can
>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>> write
>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>> views
>>>>>>>>>>>>>>>>>>>>>>>>>> or materialised views in SQL or that user
>> currently
>>>>>>> can
>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>> write
>>>>>>>>>>>>>>>>>>>>>> to a
>>>>>>>>>>>>>>>>>>>>>>>>>> Table.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> I don't think anyone should insert something to a
>>>>>>> cache.
>>>>>>>>>>> By
>>>>>>>>>>>>>>>>>>>>>> definition
>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> cache should only be updated when the corresponding
>>>>>>>>>>> original
>>>>>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>> updated. What I am wondering is that given the
>>>>>>> following
>>>>>>>>>>> two
>>>>>>>>>>>>>>>>>>>> facts:
>>>>>>>>>>>>>>>>>>>>>>>>> 1. If and only if a table is mutable (with
>> something
>>>>>>> like
>>>>>>>>>>>>>>>>>>>>> insert()),
>>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>> CachedTable may have implicit behavior.
>>>>>>>>>>>>>>>>>>>>>>>>> 2. A CachedTable extends a Table.
>>>>>>>>>>>>>>>>>>>>>>>>> We can come to the conclusion that a CachedTable is
>>>>>>>>>>> mutable
>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>> users
>>>>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>>>> insert into the CachedTable directly. This is
>> where I
>>>>>>>>>>>> thought
>>>>>>>>>>>>>>>>>>>>>>> confusing.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> On Sat, Dec 1, 2018 at 2:45 AM Piotr Nowojski <
>>>>>>>>>>>>>>>>>>>>>> pi...@data-artisans.com
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Regarding naming `cache()` vs `materialize()`. One
>>>>>>> more
>>>>>>>>>>>>>>>>>>>>> explanation
>>>>>>>>>>>>>>>>>>>>>>> why
>>>>>>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>>>> think `materialize()` is more natural to me is
>> that
>>>> I
>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>>>>>> “Table”s
>>>>>>>>>>>>>>>>>>>>>>>>>> in Table-API as views. They behave the same way as
>>>> SQL
>>>>>>>>>>>>>>>>>> views,
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> only
>>>>>>>>>>>>>>>>>>>>>>>>>> difference for me is that their live scope is
>> short
>>>> -
>>>>>>>>>>>>>>>>>> current
>>>>>>>>>>>>>>>>>>>>>> session
>>>>>>>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>>>>>>> is limited by different execution model. That’s
>> why
>>>>>>>>>>>>>>>>>> “cashing”
>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>> view
>>>>>>>>>>>>>>>>>>>>>>>> for me
>>>>>>>>>>>>>>>>>>>>>>>>>> is just materialising it.
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> However I see and I understand your point of view.
>>>>>>> Coming
>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>>>>>> DataSet/DataStream and generally speaking non-SQL
>>>>>>> world,
>>>>>>>>>>>>>>>>>>>> `cache()`
>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>>>>>>>>> natural. But keep in mind that `.cache()`
>> will/might
>>>>>>> not
>>>>>>>>>>>>>>>>>> only
>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>> used
>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>> interactive programming and not only in batching.
>>>> But
>>>>>>>>>>>> naming
>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>>>>>>>>> issue,
>>>>>>>>>>>>>>>>>>>>>>>>>> and not that critical to me. Especially that once
>> we
>>>>>>>>>>>>>>>>>> implement
>>>>>>>>>>>>>>>>>>>>>> proper
>>>>>>>>>>>>>>>>>>>>>>>>>> materialised views, we can always deprecate/rename
>>>>>>>>>>>> `cache()`
>>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>> deem
>>>>>>>>>>>>>>>>>>>>>>>> so.
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> For me the more important issue is of not having
>> the
>>>>>>>>>>> `void
>>>>>>>>>>>>>>>>>>>>> cache()`
>>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>>> side effects. Exactly for the reasons that you
>> have
>>>>>>>>>>>>>>>>>> mentioned.
>>>>>>>>>>>>>>>>>>>>> True:
>>>>>>>>>>>>>>>>>>>>>>>>>> results might be non deterministic if underlying
>>>>>>> source
>>>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>> changing.
>>>>>>>>>>>>>>>>>>>>>>>>>> Problem is that `void cache()` implicitly changes
>>>> the
>>>>>>>>>>>>>>>>>> semantic
>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>> subsequent uses of the cached/materialized Table.
>> It
>>>>>>> can
>>>>>>>>>>>>>>>>>> cause
>>>>>>>>>>>>>>>>>>>>> “wtf”
>>>>>>>>>>>>>>>>>>>>>>>> moment
>>>>>>>>>>>>>>>>>>>>>>>>>> for a user if he inserts “b.cache()” call in some
>>>>>>> place
>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>> his
>>>>>>>>>>>>>>>>>>>>> code
>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>> suddenly some other random places are behaving
>>>>>>>>>>> differently.
>>>>>>>>>>>>>>>>>> If
>>>>>>>>>>>>>>>>>>>>>>>>>> `materialize()` or `cache()` returns a Table
>> handle,
>>>>>>> we
>>>>>>>>>>>>>>>>>> force
>>>>>>>>>>>>>>>>>>>> user
>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>> explicitly use the cache which removes the
>> “random”
>>>>>>> part
>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> "suddenly
>>>>>>>>>>>>>>>>>>>>>>>>>> some other random places are behaving
>> differently”.
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> This argument and others that I’ve raised (greater
>>>>>>>>>>>>>>>>>>>>>>> flexibility/allowing
>>>>>>>>>>>>>>>>>>>>>>>>>> user to explicitly bypass the cache) are
>> independent
>>>>>>> of
>>>>>>>>>>>>>>>>>>>> `cache()`
>>>>>>>>>>>>>>>>>>>>> vs
>>>>>>>>>>>>>>>>>>>>>>>>>> `materialize()` discussion.
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> Does that mean one can also insert into the
>>>>>>> CachedTable?
>>>>>>>>>>>>>>>>>> This
>>>>>>>>>>>>>>>>>>>>>> sounds
>>>>>>>>>>>>>>>>>>>>>>>>>> pretty confusing.
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> I don’t know, probably initially we should make
>>>>>>>>>>> CachedTable
>>>>>>>>>>>>>>>>>>>>>>> read-only. I
>>>>>>>>>>>>>>>>>>>>>>>>>> don’t find it more confusing than the fact that
>> user
>>>>>>> can
>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>> write
>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>> views
>>>>>>>>>>>>>>>>>>>>>>>>>> or materialised views in SQL or that user
>> currently
>>>>>>> can
>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>> write
>>>>>>>>>>>>>>>>>>>>>> to a
>>>>>>>>>>>>>>>>>>>>>>>>>> Table.
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Piotrek
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> On 30 Nov 2018, at 17:38, Xingcan Cui <
>>>>>>>>>>> xingc...@gmail.com
>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> I agree with @Becket that `cache()` and
>>>>>>> `materialize()`
>>>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>>>> considered as two different methods where the
>> later
>>>>>>> one
>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>>>>>>>>> sophisticated.
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> According to my understanding, the initial idea
>> is
>>>>>>> just
>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>> introduce
>>>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>>> simple cache or persist mechanism, but as the
>>>> TableAPI
>>>>>>>>>>> is a
>>>>>>>>>>>>>>>>>>>>>> high-level
>>>>>>>>>>>>>>>>>>>>>>>> API,
>>>>>>>>>>>>>>>>>>>>>>>>>> it’s naturally for as to think in a SQL way.
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> Maybe we can add the `cache()` method to the
>>>> DataSet
>>>>>>> API
>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>> force
>>>>>>>>>>>>>>>>>>>>>>>> users
>>>>>>>>>>>>>>>>>>>>>>>>>> to translate a Table to a Dataset before caching
>> it.
>>>>>>> Then
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> users
>>>>>>>>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>>>>>>>> manually register the cached dataset to a table
>>>> again
>>>>>>> (we
>>>>>>>>>>>>>>>>>> may
>>>>>>>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>>>>>>> table replacement mechanisms for datasets with an
>>>>>>>>>>> identical
>>>>>>>>>>>>>>>>>>>> schema
>>>>>>>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>>>>>>>>>> different contents here). After all, it’s the
>>>> dataset
>>>>>>>>>>>> rather
>>>>>>>>>>>>>>>>>>>> than
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> dynamic table that need to be cached, right?
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>> Xingcan
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Nov 30, 2018, at 10:57 AM, Becket Qin <
>>>>>>>>>>>>>>>>>>>> becket....@gmail.com>
>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Piotrek and Jark,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback and explanation. Those
>> are
>>>>>>> good
>>>>>>>>>>>>>>>>>>>>> arguments.
>>>>>>>>>>>>>>>>>>>>>>>> But I
>>>>>>>>>>>>>>>>>>>>>>>>>>>> think those arguments are mostly about
>>>> materialized
>>>>>>>>>>> view.
>>>>>>>>>>>>>>>>>>> Let
>>>>>>>>>>>>>>>>>>>> me
>>>>>>>>>>>>>>>>>>>>>> try
>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>> explain the reason I believe cache() and
>>>>>>> materialize()
>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>> different.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think cache() and materialize() have quite
>>>>>>> different
>>>>>>>>>>>>>>>>>>>>>> implications.
>>>>>>>>>>>>>>>>>>>>>>>> An
>>>>>>>>>>>>>>>>>>>>>>>>>>>> analogy I can think of is save()/publish(). When
>>>>>>> users
>>>>>>>>>>>>>>>>>> call
>>>>>>>>>>>>>>>>>>>>>> cache(),
>>>>>>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>>>> just like they are saving an intermediate result
>>>> as
>>>>>>> a
>>>>>>>>>>>>>>>>>> draft
>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>> their
>>>>>>>>>>>>>>>>>>>>>>>>>> work,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> this intermediate result may not have any
>>>> realistic
>>>>>>>>>>>>>>>>>> meaning.
>>>>>>>>>>>>>>>>>>>>>> Calling
>>>>>>>>>>>>>>>>>>>>>>>>>>>> cache() does not mean users want to publish the
>>>>>>> cached
>>>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>>>>>>>>>>>> manner.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> But when users call materialize(), that means "I
>>>>>>> have
>>>>>>>>>>>>>>>>>>>> something
>>>>>>>>>>>>>>>>>>>>>>>>>> meaningful
>>>>>>>>>>>>>>>>>>>>>>>>>>>> to be reused by others", now users need to think
>>>>>>> about
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> validation,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> update & versioning, lifecycle of the result,
>> etc.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Piotrek's suggestions on variations of the
>>>>>>>>>>> materialize()
>>>>>>>>>>>>>>>>>>>> methods
>>>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>>>> very
>>>>>>>>>>>>>>>>>>>>>>>>>>>> useful. It would be great if Flink have them.
>> The
>>>>>>>>>>> concept
>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>> materialized
>>>>>>>>>>>>>>>>>>>>>>>>>>>> view is actually a pretty big feature, not to
>> say
>>>>>>> the
>>>>>>>>>>>>>>>>>>> related
>>>>>>>>>>>>>>>>>>>>>> stuff
>>>>>>>>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>>>>>>>> triggers/hooks you mentioned earlier. I think
>> the
>>>>>>>>>>>>>>>>>>> materialized
>>>>>>>>>>>>>>>>>>>>>> view
>>>>>>>>>>>>>>>>>>>>>>>>>> itself
>>>>>>>>>>>>>>>>>>>>>>>>>>>> should be discussed in a more thorough and
>>>>>>> systematic
>>>>>>>>>>>>>>>>>>> manner.
>>>>>>>>>>>>>>>>>>>>> And
>>>>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>>>> found
>>>>>>>>>>>>>>>>>>>>>>>>>>>> that discussion is kind of orthogonal and way
>>>> beyond
>>>>>>>>>>>>>>>>>>>> interactive
>>>>>>>>>>>>>>>>>>>>>>>>>>>> programming experience.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> The example you gave was interesting. I still
>> have
>>>>>>> some
>>>>>>>>>>>>>>>>>>>>> questions,
>>>>>>>>>>>>>>>>>>>>>>>>>> though.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Table source = … // some source that scans files
>>>>>>> from a
>>>>>>>>>>>>>>>>>>>>> directory
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> “/foo/bar/“
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Table t1 = source.groupBy(…).select(…).where(…)
>>>> ….;
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Table t2 = t1.materialize() // (or `cache()`)
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> t2.count() // initialise cache (if it’s lazily
>>>>>>>>>>>>>>>>>> initialised)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> int a1 = t1.count()
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> int b1 = t2.count()
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> // something in the background (or we trigger
>> it)
>>>>>>>>>>> writes
>>>>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>>>> files
>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> /foo/bar
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> int a2 = t1.count()
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> int b2 = t2.count()
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> t2.refresh() // possible future extension, not
>> to
>>>>>>> be
>>>>>>>>>>>>>>>>>>>>> implemented
>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> initial version
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> what if someone else added some more files to
>>>>>>> /foo/bar
>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>> point?
>>>>>>>>>>>>>>>>>>>>>>>> In
>>>>>>>>>>>>>>>>>>>>>>>>>>>> that case, a3 won't equals to b3, and the result
>>>>>>> become
>>>>>>>>>>>>>>>>>>>>>>>>>> non-deterministic,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> right?
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> int a3 = t1.count()
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> int b3 = t2.count()
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> t2.drop() // another possible future extension,
>>>>>>> manual
>>>>>>>>>>>>>>>>>>>> “cache”
>>>>>>>>>>>>>>>>>>>>>>>> dropping
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> When we talk about interactive programming, in
>>>> most
>>>>>>>>>>>> cases,
>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>>>> talking
>>>>>>>>>>>>>>>>>>>>>>>>>>>> about batch applications. A fundamental
>> assumption
>>>>>>> of
>>>>>>>>>>>> such
>>>>>>>>>>>>>>>>>>>> case
>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>> source data is complete before the data
>> processing
>>>>>>>>>>>> begins,
>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>>>>>>>>>>>>>> will not change during the data processing. IMO,
>>>> if
>>>>>>>>>>>>>>>>>>> additional
>>>>>>>>>>>>>>>>>>>>>> rows
>>>>>>>>>>>>>>>>>>>>>>>>>> needs
>>>>>>>>>>>>>>>>>>>>>>>>>>>> to be added to some source during the
>> processing,
>>>> it
>>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>> done
>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>> ways
>>>>>>>>>>>>>>>>>>>>>>>>>>>> like union the source with another table
>>>> containing
>>>>>>> the
>>>>>>>>>>>>>>>>>> rows
>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>>>> added.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> There are a few cases that computations are
>>>> executed
>>>>>>>>>>>>>>>>>>>> repeatedly
>>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>> changing data source.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> For example, people may run a ML training job
>>>> every
>>>>>>>>>>> hour
>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> samples
>>>>>>>>>>>>>>>>>>>>>>>>>>>> newly added in the past hour. In that case, the
>>>>>>> source
>>>>>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>>>>>>>> between
>>>>>>>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>>>>>>>>> indeed change. But still, the data remain
>>>> unchanged
>>>>>>>>>>>> within
>>>>>>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>>>>>>> run.
>>>>>>>>>>>>>>>>>>>>>>>> And
>>>>>>>>>>>>>>>>>>>>>>>>>>>> usually in that case, the result will need
>>>>>>> versioning,
>>>>>>>>>>>>>>>>>> i.e.
>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>> given
>>>>>>>>>>>>>>>>>>>>>>>>>>>> result, it tells that the result is a result
>> from
>>>>>>> the
>>>>>>>>>>>>>>>>>> source
>>>>>>>>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>>>>>>>>> by a
>>>>>>>>>>>>>>>>>>>>>>>>>>>> certain timestamp.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Another example is something like data
>> warehouse.
>>>> In
>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>> case,
>>>>>>>>>>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>>>>>>>>>>>> are a
>>>>>>>>>>>>>>>>>>>>>>>>>>>> few source of original/raw data. On top of those
>>>>>>>>>>> sources,
>>>>>>>>>>>>>>>>>>> many
>>>>>>>>>>>>>>>>>>>>>>>>>> materialized
>>>>>>>>>>>>>>>>>>>>>>>>>>>> view / queries / reports / dashboards can be
>>>>>>> created to
>>>>>>>>>>>>>>>>>>>> generate
>>>>>>>>>>>>>>>>>>>>>>>> derived
>>>>>>>>>>>>>>>>>>>>>>>>>>>> data. Those derived data needs to be updated
>> when
>>>>>>> the
>>>>>>>>>>>>>>>>>>>> underlying
>>>>>>>>>>>>>>>>>>>>>>>>>> original
>>>>>>>>>>>>>>>>>>>>>>>>>>>> data changes. In that case, the processing logic
>>>>>>> that
>>>>>>>>>>>>>>>>>>> derives
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> original
>>>>>>>>>>>>>>>>>>>>>>>>>>>> data needs to be executed repeatedly to update
>>>> those
>>>>>>>>>>>>>>>>>>>>>> reports/views.
>>>>>>>>>>>>>>>>>>>>>>>>>> Again,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> all those derived data also need to ha
>>>>>>> 
>>>>>>> 
>>>> 
>>>> 
>>>> 
>> 
>> 

Reply via email to