Hi Becket,

With `uncache` there are probably two features that we can think about:

a)

Physically dropping the cached table from the storage, freeing up the resources

b)

Hinting the optimizer to not cache the reads for the next query/table

a) Has the issue as I wrote before, that it seemed to be an operation 
inherently “flawed" with having side effects.

I’m not sure how it would be best to express. We could make it work:

1. via a method on a Table as you proposed:

void Table#dropCache()
void Table#uncache()

2. Operation on the environment

env.dropCacheFor(table) // or some other argument that allows user to identify 
the desired cache

3. Extending (from your original design doc) `setTableService` method to return 
some control handle like:

TableServiceControl setTableService(TableFactory tf, 
                     TableProperties properties, 
                     TempTableCleanUpCallback cleanUpCallback);

(TableServiceControl? TableService? TableServiceHandle? CacheService?)

And having the drop cache method there:

TableServiceControl#dropCache(table)

Out of those options, option 1 might have a disadvantage of kind of not making 
the user aware, that this is a global operation with side effects. Like the old 
example of:

public void foo(Table t) {
  // …
  t.dropCache();
}

It might not be immediately obvious that `t.dropCache()` is some kind of global 
operation, with side effects visible outside of the `foo` function.

On the other hand, both option 2 and 3, might have greater chance of catching 
user’s attention:

public void foo(Table t, CacheService cacheService) {
  // …
  cacheService.dropCache(t);
}

b) could be achieved quite easily:

Table a = …
val notCached1 = a.doNotCache()
val cachedA = a.cache()
val notCached2 = cachedA.doNotCache() // equivalent of notCached1

`doNotCache()` would behave similarly to `cache()` - return a copy of the table 
with removed “cache” hint and/or added “never cache” hint.

Piotrek


> On 8 Jan 2019, at 03:17, Becket Qin <becket....@gmail.com> wrote:
> 
> Hi Piotr,
> 
> Thanks for the proposal and detailed explanation. I like the idea of
> returning a new hinted Table without modifying the original table. This
> also leave the room for users to benefit from future implicit caching.
> 
> Just to make sure I get the full picture. In your proposal, there will also
> be a 'void Table#uncache()' method to release the cache, right?
> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> On Mon, Jan 7, 2019 at 11:50 PM Piotr Nowojski <pi...@da-platform.com>
> wrote:
> 
>> Hi Becket!
>> 
>> After further thinking I tend to agree that my previous proposal (*Option
>> 2*) indeed might not be if would in the future introduce automatic caching.
>> However I would like to propose a slightly modified version of it:
>> 
>> *Option 4*
>> 
>> Adding `cache()` method with following signature:
>> 
>> Table Table#cache();
>> 
>> Without side-effects, and `cache()` call do not modify/change original
>> Table in any way.
>> It would return a copy of original table, with added hint for the
>> optimizer to cache the table, so that the future accesses to the returned
>> table might be cached or not.
>> 
>> Assuming that we are talking about a setup, where we do not have automatic
>> caching enabled (possible future extension).
>> 
>> Example #1:
>> 
>> ```
>> Table a = …
>> a.foo() // not cached
>> 
>> val cachedTable = a.cache();
>> 
>> cachedA.bar() // maybe cached
>> a.foo() // same as before - effectively not cached
>> ```
>> 
>> Both the first and the second `a.foo()` operations would behave in the
>> exactly same way. Again, `a.cache()` call doesn’t affect `a` itself. If `a`
>> was not hinted for caching before `a.cache();`, then both `a.foo()` calls
>> wouldn’t use cache.
>> 
>> Returned `cachedA` would be hinted with “cache” hint, so probably
>> `cachedA.bar()` would go through cache (unless optimiser decides the
>> opposite)
>> 
>> Example #2
>> 
>> ```
>> Table a = …
>> 
>> a.foo() // not cached
>> 
>> val b = a.cache();
>> 
>> a.foo() // same as before - effectively not cached
>> b.foo() // maybe cached
>> 
>> val c = b.cache();
>> 
>> a.foo() // same as before - effectively not cached
>> b.foo() // same as before - effectively maybe cached
>> c.foo() // maybe cached
>> ```
>> 
>> Now, assuming that we have some future “automatic caching optimisation”:
>> 
>> Example #3
>> 
>> ```
>> env.enableAutomaticCaching()
>> Table a = …
>> 
>> a.foo() // might be cached, depending if `a` was selected to automatic
>> caching
>> 
>> val b = a.cache();
>> 
>> a.foo() // same as before - might be cached, if `a` was selected to
>> automatic caching
>> b.foo() // maybe cached
>> ```
>> 
>> 
>> More or less this is the same behaviour as:
>> 
>> Table a = ...
>> val b = a.filter(x > 20)
>> 
>> calling `filter` hasn’t changed or altered `a` in anyway. If `a` was
>> previously filtered:
>> 
>> Table src = …
>> val a = src.filter(x > 20)
>> val b = a.filter(x > 20)
>> 
>> then yes, `a` and `b` will be the same. But the point is that neither
>> `filter` nor `cache` changes the original `a` table.
>> 
>> One thing is that indeed, physically dropping cache operation, will have
>> side effects and it will in a way mutate the cached table references. But
>> this is I think unavoidable in any solution - the same issue as calling
>> `.close()`, or calling destructor in C++.
>> 
>> Piotrek
>> 
>>> On 7 Jan 2019, at 10:41, Becket Qin <becket....@gmail.com> wrote:
>>> 
>>> Happy New Year, everybody!
>>> 
>>> I would like to resume this discussion thread. At this point, We have
>>> agreed on the first step goal of interactive programming. The open
>>> discussion is the exact API. More specifically, what should *cache()*
>>> method return and what is the semantic. There are three options:
>>> 
>>> *Option 1*
>>> *void cache()* OR *Table cache()* which returns the original table for
>>> chained calls.
>>> *void uncache() *releases the cache.
>>> *Table.hint(ignoreCache).foo()* to ignore cache for operation foo().
>>> 
>>> - Semantic: a.cache() hints that table 'a' should be cached. Optimizer
>>> decides whether the cache will be used or not.
>>> - pros: simple and no confusion between CachedTable and original table
>>> - cons: A table may be cached / uncached in a method invocation, while
>> the
>>> caller does not know about this.
>>> 
>>> *Option 2*
>>> *CachedTable cache()*
>>> *CachedTable *extends *Table *with an additional *uncache()* method
>>> 
>>> - Semantic: After *val cachedA = a.cache()*, *cachedA.foo()* will always
>>> use cache. *a.bar() *will always use original DAG.
>>> - pros: No potential side effects in method invocation.
>>> - cons: Optimizer has no chance to kick in. Future optimization will
>> become
>>> a behavior change and need users to change the code.
>>> 
>>> *Option 3*
>>> *CacheHandle cache()*
>>> *CacheHandle.release() *to release a cache handle on the table. If all
>>> cache handles are released, the cache could be removed.
>>> *Table.hint(ignoreCache).foo()* to ignore cache for operation foo().
>>> 
>>> - Semantic: *a.cache() *hints that 'a' should be cached. Optimizer
>> decides
>>> whether the cache will be used or not. Cache is released either no handle
>>> is on it, or the user program exits.
>>> - pros: No potential side effect in method invocation. No confusion
>> between
>>> cached table v.s original table.
>>> - cons: An additional CacheHandle exposed to the users.
>>> 
>>> 
>>> Personally I prefer option 3 for the following reasons:
>>> 1. It is simple. Vast majority of the users would just call
>>> *a.cache()* followed
>>> by *a.foo(),* *a.bar(), etc. *
>>> 2. There is no semantic ambiguity and semantic change if we decide to add
>>> implicit cache in the future.
>>> 3. There is no side effect in the method calls.
>>> 4. Admittedly we need to expose one more CacheHandle class to the users.
>>> But it is not that difficult to understand given similar well known
>> concept
>>> like ref count (we can name it CacheReference if that is easier to
>>> understand). So I think it is fine.
>>> 
>>> 
>>> Thanks,
>>> 
>>> Jiangjie (Becket) Qin
>>> 
>>> 
>>> On Thu, Dec 13, 2018 at 11:23 AM Becket Qin <becket....@gmail.com>
>> wrote:
>>> 
>>>> Hi Piotrek,
>>>> 
>>>> 1. Regarding optimization.
>>>> Sure there are many cases that the decision is hard to make. But that
>> does
>>>> not make it any easier for the users to make those decisions. I imagine
>> 99%
>>>> of the users would just naively use cache. I am not saying we can
>> optimize
>>>> in all the cases. But as long as we agree that at least in certain
>> cases (I
>>>> would argue most cases), optimizer can do a little better than an
>> average
>>>> user who likely knows little about Flink internals, we should not push
>> the
>>>> burden of optimization to users.
>>>> 
>>>> BTW, it seems some of your concerns are related to the implementation. I
>>>> did not mention the implementation of the caching service because that
>>>> should not affect the API semantic. Not sure if this helps, but imagine
>> the
>>>> default implementation has one StorageNode service colocating with each
>> TM.
>>>> It could be running within the TM process or in a standalone process,
>>>> depending on configuration.
>>>> 
>>>> The StorageNode uses memory + spill-to-disk mechanism. The cached data
>>>> will just be written to the local StorageNode service. If the
>> StorageNode
>>>> is running within the TM process, the in-memory cache could just be
>> objects
>>>> so we save some serde cost. A later job referring to the cached Table
>> will
>>>> be scheduled in a locality aware manner, i.e. run in the TM whose peer
>>>> StorageNode hosts the data.
>>>> 
>>>> 
>>>> 2. Semantic
>>>> I am not sure why introducing a new hintCache() or
>>>> env.enableAutomaticCaching() method would avoid the consequence of
>> semantic
>>>> change.
>>>> 
>>>> If the auto optimization is not enabled by default, users still need to
>>>> make code change to all existing programs in order to get the benefit.
>>>> If the auto optimization is enabled by default, advanced users who know
>>>> that they really want to use cache will suddenly lose the opportunity
>> to do
>>>> so, unless they change the code to disable auto optimization.
>>>> 
>>>> 
>>>> 3. side effect
>>>> The CacheHandle is not only for where to put uncache(). It is to solve
>> the
>>>> implicit performance impact by moving the uncache() to the CacheHandle.
>>>> 
>>>>  - If users wants to leverage cache, they can call a.cache(). After
>>>>  that, unless user explicitly release that CacheHandle, a.foo() will
>> always
>>>>  leverage cache if needed (optimizer may choose to ignore cache if that
>>>>  helps accelerate the process). Any function call will not be able to
>>>>  release the cache because they do not have that CacheHandle.
>>>>  - If some advanced users do not want to use cache at all, they will
>>>>  call a.hint(ignoreCache).foo(). This will for sure ignore cache and
>> use the
>>>>  original DAG to process.
>>>> 
>>>> 
>>>>> In vast majority of the cases, users wouldn't really care whether the
>>>>> cache is used or not.
>>>>> I wouldn’t agree with that, because “caching” (if not purely in memory
>>>>> caching) would add additional IO costs. It’s similar as saying that
>> users
>>>>> would not see a difference between Spark/Flink and MapReduce (MapReduce
>>>>> writes data to disks after every map/reduce stage).
>>>> 
>>>> What I wanted to say is that in most cases, after users call cache(),
>> they
>>>> don't really care about whether auto optimization has decided to ignore
>> the
>>>> cache or not, as long as the program runs faster.
>>>> 
>>>> Thanks,
>>>> 
>>>> Jiangjie (Becket) Qin
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> On Wed, Dec 12, 2018 at 10:50 PM Piotr Nowojski <
>> pi...@data-artisans.com>
>>>> wrote:
>>>> 
>>>>> Hi,
>>>>> 
>>>>> Thanks for the quick answer :)
>>>>> 
>>>>> Re 1.
>>>>> 
>>>>> I generally agree with you, however couple of points:
>>>>> 
>>>>> a) the problem with using automatic caching is bigger, because you will
>>>>> have to decide, how do you compare IO vs CPU costs and if you pick
>> wrong,
>>>>> additional IO costs might be enormous or even can crash your system.
>> This
>>>>> is more difficult problem compared to let say join reordering, where
>> the
>>>>> only issue is to have good statistics that can capture correlations
>> between
>>>>> columns (when you reorder joins number of IO operations do not change)
>>>>> c) your example is completely independent of caching.
>>>>> 
>>>>> Query like this:
>>>>> 
>>>>> src1.filte('f1 > 10).join(src2.filter('f2 < 30), `f1 ===`f2).as('f3,
>>>>> …).filter(‘f3 > 30)
>>>>> 
>>>>> Should/could be optimised to empty result immediately, without the need
>>>>> for any cache/materialisation and that should work even without any
>>>>> statistics provided by the connector.
>>>>> 
>>>>> For me prerequisite to any serious cost-based optimisations would be
>> some
>>>>> reasonable benchmark coverage of the code (tpch?). Otherwise that
>> would be
>>>>> equivalent of adding not tested code, since we wouldn’t be able to
>> verify
>>>>> our assumptions, like how does the writing of 10 000 records to
>>>>> cache/RocksDB/Kafka/CSV file compare to joining/filtering/processing of
>>>>> lets say 1000 000 rows.
>>>>> 
>>>>> Re 2.
>>>>> 
>>>>> I wasn’t proposing to change the semantic later. I was proposing that
>> we
>>>>> start now:
>>>>> 
>>>>> CachedTable cachedA = a.cache()
>>>>> cachedA.foo() // Cache is used
>>>>> a.bar() // Original DAG is used
>>>>> 
>>>>> And then later we can think about adding for example
>>>>> 
>>>>> CachedTable cachedA = a.hintCache()
>>>>> cachedA.foo() // Cache might be used
>>>>> a.bar() // Original DAG is used
>>>>> 
>>>>> Or
>>>>> 
>>>>> env.enableAutomaticCaching()
>>>>> a.foo() // Cache might be used
>>>>> a.bar() // Cache might be used
>>>>> 
>>>>> Or (I would still not like this option):
>>>>> 
>>>>> a.hintCache()
>>>>> a.foo() // Cache might be used
>>>>> a.bar() // Cache might be used
>>>>> 
>>>>> Or whatever else that will come to our mind. Even if we add some
>>>>> automatic caching in the future, keeping implicit (`CachedTable
>> cache()`)
>>>>> caching will still be useful, at least in some cases.
>>>>> 
>>>>> Re 3.
>>>>> 
>>>>>> 2. The source tables are immutable during one run of batch processing
>>>>> logic.
>>>>>> 3. The cache is immutable during one run of batch processing logic.
>>>>> 
>>>>>> I think assumption 2 and 3 are by definition what batch processing
>>>>> means,
>>>>>> i.e the data must be complete before it is processed and should not
>>>>> change
>>>>>> when the processing is running.
>>>>> 
>>>>> I agree that this is how batch systems SHOULD be working. However I
>> know
>>>>> from my previous experience that it’s not always the case. Sometimes
>> users
>>>>> are just working on some non transactional storage, which can be
>> (either
>>>>> constantly or occasionally) being modified by some other processes for
>>>>> whatever the reasons (fixing the data, updating, adding new data etc).
>>>>> 
>>>>> But even if we ignore this point (data immutability), performance side
>>>>> effect issue of your proposal remains. If user calls `void a.cache()`
>> deep
>>>>> inside some private method, it will have implicit side effects on other
>>>>> parts of his program that might not be obvious.
>>>>> 
>>>>> Re `CacheHandle`.
>>>>> 
>>>>> If I understand it correctly, it only addresses the issue where to
>> place
>>>>> method `uncache`/`dropCache`.
>>>>> 
>>>>> Btw,
>>>>> 
>>>>>> In vast majority of the cases, users wouldn't really care whether the
>>>>> cache is used or not.
>>>>> 
>>>>> I wouldn’t agree with that, because “caching” (if not purely in memory
>>>>> caching) would add additional IO costs. It’s similar as saying that
>> users
>>>>> would not see a difference between Spark/Flink and MapReduce (MapReduce
>>>>> writes data to disks after every map/reduce stage).
>>>>> 
>>>>> Piotrek
>>>>> 
>>>>>> On 12 Dec 2018, at 14:28, Becket Qin <becket....@gmail.com> wrote:
>>>>>> 
>>>>>> Hi Piotrek,
>>>>>> 
>>>>>> Not sure if you noticed, in my last email, I was proposing
>> `CacheHandle
>>>>>> cache()` to avoid the potential side effect due to function calls.
>>>>>> 
>>>>>> Let's look at the disagreement in your reply one by one.
>>>>>> 
>>>>>> 
>>>>>> 1. Optimization chances
>>>>>> 
>>>>>> Optimization is never a trivial work. This is exactly why we should
>> not
>>>>> let
>>>>>> user manually do that. Databases have done huge amount of work in this
>>>>>> area. At Alibaba, we rely heavily on many optimization rules to boost
>>>>> the
>>>>>> SQL query performance.
>>>>>> 
>>>>>> In your example, if I filling the filter conditions in a certain way,
>>>>> the
>>>>>> optimization would become obvious.
>>>>>> 
>>>>>> Table src1 = … // read from connector 1
>>>>>> Table src2 = … // read from connector 2
>>>>>> 
>>>>>> Table a = src1.filte('f1 > 10).join(src2.filter('f2 < 30), `f1 ===
>>>>>> `f2).as('f3, ...)
>>>>>> a.cache() // write cache to connector 3, when writing the records,
>>>>> remember
>>>>>> min and max of `f1
>>>>>> 
>>>>>> a.filter('f3 > 30) // There is no need to read from any connector
>>>>> because
>>>>>> `a` does not contain any record whose 'f3 is greater than 30.
>>>>>> env.execute()
>>>>>> a.select(…)
>>>>>> 
>>>>>> BTW, it seems to me that adding some basic statistics is fairly
>>>>>> straightforward and the cost is pretty marginal if not ignorable. In
>>>>> fact
>>>>>> it is not only needed for optimization, but also for cases such as ML,
>>>>>> where some algorithms may need to decide their parameter based on the
>>>>>> statistics of the data.
>>>>>> 
>>>>>> 
>>>>>> 2. Same API, one semantic now, another semantic later.
>>>>>> 
>>>>>> I am trying to understand what is the semantic of `CachedTable
>> cache()`
>>>>> you
>>>>>> are proposing. IMO, we should avoid designing an API whose semantic
>>>>> will be
>>>>>> changed later. If we have a "CachedTable cache()" method, then the
>>>>> semantic
>>>>>> should be very clearly defined upfront and do not change later. It
>>>>> should
>>>>>> never be "right now let's go with semantic 1, later we can silently
>>>>> change
>>>>>> it to semantic 2 or 3". Such change could result in bad consequence.
>> For
>>>>>> example, let's say we decide go with semantic 1:
>>>>>> 
>>>>>> CachedTable cachedA = a.cache()
>>>>>> cachedA.foo() // Cache is used
>>>>>> a.bar() // Original DAG is used.
>>>>>> 
>>>>>> Now majority of the users would be using cachedA.foo() in their code.
>>>>> And
>>>>>> some advanced users will use a.bar() to explicitly skip the cache.
>> Later
>>>>>> on, we added smart optimization and change the semantic to semantic 2:
>>>>>> 
>>>>>> CachedTable cachedA = a.cache()
>>>>>> cachedA.foo() // Cache is used
>>>>>> a.bar() // Cache MIGHT be used, and Flink may decide to skip cache if
>>>>> it is
>>>>>> faster.
>>>>>> 
>>>>>> Now most of the users who were writing cachedA.foo() will not benefit
>>>>> from
>>>>>> this optimization at all, unless they change their code to use a.foo()
>>>>>> instead. And those advanced users suddenly lose the option to
>> explicitly
>>>>>> ignore cache unless they change their code (assuming we care enough to
>>>>>> provide something like hint(useCache)). If we don't define the
>> semantic
>>>>>> carefully, our users will have to change their code again and again
>>>>> while
>>>>>> they shouldn't have to.
>>>>>> 
>>>>>> 
>>>>>> 3. side effect.
>>>>>> 
>>>>>> Before we talk about side effect, we have to agree on the assumptions.
>>>>> The
>>>>>> assumptions I have are following:
>>>>>> 1. We are talking about batch processing.
>>>>>> 2. The source tables are immutable during one run of batch processing
>>>>> logic.
>>>>>> 3. The cache is immutable during one run of batch processing logic.
>>>>>> 
>>>>>> I think assumption 2 and 3 are by definition what batch processing
>>>>> means,
>>>>>> i.e the data must be complete before it is processed and should not
>>>>> change
>>>>>> when the processing is running.
>>>>>> 
>>>>>> As far as I am aware of, I don't know any batch processing system
>>>>> breaking
>>>>>> those assumptions. Even for relational database tables, where queries
>>>>> can
>>>>>> run with concurrent modifications, necessary locking are still
>> required
>>>>> to
>>>>>> ensure the integrity of the query result.
>>>>>> 
>>>>>> Please let me know if you disagree with the above assumptions. If you
>>>>> agree
>>>>>> with these assumptions, with the `CacheHandle cache()` API in my last
>>>>>> email, do you still see side effects?
>>>>>> 
>>>>>> Thanks,
>>>>>> 
>>>>>> Jiangjie (Becket) Qin
>>>>>> 
>>>>>> 
>>>>>> On Wed, Dec 12, 2018 at 7:11 PM Piotr Nowojski <
>> pi...@data-artisans.com
>>>>>> 
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi Becket,
>>>>>>> 
>>>>>>>> Regarding the chance of optimization, it might not be that rare.
>> Some
>>>>>>> very
>>>>>>>> simple statistics could already help in many cases. For example,
>>>>> simply
>>>>>>>> maintaining max and min of each fields can already eliminate some
>>>>>>>> unnecessary table scan (potentially scanning the cached table) if
>> the
>>>>>>>> result is doomed to be empty. A histogram would give even further
>>>>>>>> information. The optimizer could be very careful and only ignores
>>>>> cache
>>>>>>>> when it is 100% sure doing that is cheaper. e.g. only when a filter
>> on
>>>>>>> the
>>>>>>>> cache will absolutely return nothing.
>>>>>>> 
>>>>>>> I do not see how this might be easy to achieve. It would require tons
>>>>> of
>>>>>>> effort to make it work and in the end you would still have a problem
>> of
>>>>>>> comparing/trading CPU cycles vs IO. For example:
>>>>>>> 
>>>>>>> Table src1 = … // read from connector 1
>>>>>>> Table src2 = … // read from connector 2
>>>>>>> 
>>>>>>> Table a = src1.filter(…).join(src2.filter(…), …)
>>>>>>> a.cache() // write cache to connector 3
>>>>>>> 
>>>>>>> a.filter(…)
>>>>>>> env.execute()
>>>>>>> a.select(…)
>>>>>>> 
>>>>>>> Decision whether it’s better to:
>>>>>>> A) read from connector1/connector2, filter/map and join them twice
>>>>>>> B) read from connector1/connector2, filter/map and join them once,
>> pay
>>>>> the
>>>>>>> price of writing to connector 3 and then reading from it
>>>>>>> 
>>>>>>> Is very far from trivial. `a` can end up much larger than `src1` and
>>>>>>> `src2`, writes to connector 3 might be extremely slow, reads from
>>>>> connector
>>>>>>> 3 can be slower compared to reads from connector 1 & 2, … . You
>> really
>>>>> need
>>>>>>> to have extremely good statistics to correctly asses size of the
>>>>> output and
>>>>>>> it would still be failing many times (correlations etc). And keep in
>>>>> mind
>>>>>>> that at the moment we do not have ANY statistics at all. More than
>>>>> that, it
>>>>>>> would require significantly more testing and setting up some
>>>>> benchmarks to
>>>>>>> make sure that we do not brake it with some regressions.
>>>>>>> 
>>>>>>> That’s why I’m strongly opposing this idea - at least let’s not
>> starts
>>>>>>> with this. If we first start with completely manual/explicit caching,
>>>>>>> without any magic, it would be a significant improvement for the
>> users
>>>>> for
>>>>>>> a fraction of the development cost. After implementing that, when we
>>>>>>> already have all of the working pieces, we can start working on some
>>>>>>> optimisations rules. As I wrote before, if we start with
>>>>>>> 
>>>>>>> `CachedTable cache()`
>>>>>>> 
>>>>>>> We can later work on follow up stories to make it automatic. Despite
>>>>> that
>>>>>>> I don’t like this implicit/side effect approach with `void` method,
>>>>> having
>>>>>>> explicit `CachedTable cache()` wouldn’t even prevent as from later
>>>>> adding
>>>>>>> `void hintCache()` method, with the exact semantic that you want.
>>>>>>> 
>>>>>>> On top of that I re-rise again that having implicit `void
>>>>>>> cache()/hintCache()` has other side effects and problems with non
>>>>> immutable
>>>>>>> data, and being annoying when used secretly inside methods.
>>>>>>> 
>>>>>>> Explicit `CachedTable cache()` just looks like much less
>> controversial
>>>>> MVP
>>>>>>> and if we decide to go further with this topic, it’s not a wasted
>>>>> effort,
>>>>>>> but just lies on a stright path to more advanced/complicated
>> solutions
>>>>> in
>>>>>>> the future. Are there any drawbacks of starting with `CachedTable
>>>>> cache()`
>>>>>>> that I’m missing?
>>>>>>> 
>>>>>>> Piotrek
>>>>>>> 
>>>>>>>> On 12 Dec 2018, at 09:30, Jeff Zhang <zjf...@gmail.com> wrote:
>>>>>>>> 
>>>>>>>> Hi Becket,
>>>>>>>> 
>>>>>>>> Introducing CacheHandle seems too complicated. That means users have
>>>>> to
>>>>>>>> maintain Handler properly.
>>>>>>>> 
>>>>>>>> And since cache is just a hint for optimizer, why not just return
>>>>> Table
>>>>>>>> itself for cache method. This hint info should be kept in Table I
>>>>>>> believe.
>>>>>>>> 
>>>>>>>> So how about adding method cache and uncache for Table, and both
>>>>> return
>>>>>>>> Table. Because what cache and uncache did is just adding some hint
>>>>> info
>>>>>>>> into Table.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Becket Qin <becket....@gmail.com> 于2018年12月12日周三 上午11:25写道:
>>>>>>>> 
>>>>>>>>> Hi Till and Piotrek,
>>>>>>>>> 
>>>>>>>>> Thanks for the clarification. That solves quite a few confusion. My
>>>>>>>>> understanding of how cache works is same as what Till describe.
>> i.e.
>>>>>>>>> cache() is a hint to Flink, but it is not guaranteed that cache
>>>>> always
>>>>>>>>> exist and it might be recomputed from its lineage.
>>>>>>>>> 
>>>>>>>>> Is this the core of our disagreement here? That you would like this
>>>>>>>>>> “cache()” to be mostly hint for the optimiser?
>>>>>>>>> 
>>>>>>>>> Semantic wise, yes. That's also why I think materialize() has a
>> much
>>>>>>> larger
>>>>>>>>> scope than cache(), thus it should be a different method.
>>>>>>>>> 
>>>>>>>>> Regarding the chance of optimization, it might not be that rare.
>> Some
>>>>>>> very
>>>>>>>>> simple statistics could already help in many cases. For example,
>>>>> simply
>>>>>>>>> maintaining max and min of each fields can already eliminate some
>>>>>>>>> unnecessary table scan (potentially scanning the cached table) if
>> the
>>>>>>>>> result is doomed to be empty. A histogram would give even further
>>>>>>>>> information. The optimizer could be very careful and only ignores
>>>>> cache
>>>>>>>>> when it is 100% sure doing that is cheaper. e.g. only when a filter
>>>>> on
>>>>>>> the
>>>>>>>>> cache will absolutely return nothing.
>>>>>>>>> 
>>>>>>>>> Given the above clarification on cache, I would like to revisit the
>>>>>>>>> original "void cache()" proposal and see if we can improve on top
>> of
>>>>>>> that.
>>>>>>>>> 
>>>>>>>>> What do you think about the following modified interface?
>>>>>>>>> 
>>>>>>>>> Table {
>>>>>>>>> /**
>>>>>>>>> * This call hints Flink to maintain a cache of this table and
>>>>> leverage
>>>>>>>>> it for performance optimization if needed.
>>>>>>>>> * Note that Flink may still decide to not use the cache if it is
>>>>>>> cheaper
>>>>>>>>> by doing so.
>>>>>>>>> *
>>>>>>>>> * A CacheHandle will be returned to allow user release the cache
>>>>>>>>> actively. The cache will be deleted if there
>>>>>>>>> * is no unreleased cache handlers to it. When the TableEnvironment
>>>>> is
>>>>>>>>> closed. The cache will also be deleted
>>>>>>>>> * and all the cache handlers will be released.
>>>>>>>>> *
>>>>>>>>> * @return a CacheHandle referring to the cache of this table.
>>>>>>>>> */
>>>>>>>>> CacheHandle cache();
>>>>>>>>> }
>>>>>>>>> 
>>>>>>>>> CacheHandle {
>>>>>>>>> /**
>>>>>>>>> * Close the cache handle. This method does not necessarily deletes
>>>>> the
>>>>>>>>> cache. Instead, it simply decrements the reference counter to the
>>>>> cache.
>>>>>>>>> * When the there is no handle referring to a cache. The cache will
>>>>> be
>>>>>>>>> deleted.
>>>>>>>>> *
>>>>>>>>> * @return the number of open handles to the cache after this handle
>>>>>>> has
>>>>>>>>> been released.
>>>>>>>>> */
>>>>>>>>> int release()
>>>>>>>>> }
>>>>>>>>> 
>>>>>>>>> The rationale behind this interface is following:
>>>>>>>>> In vast majority of the cases, users wouldn't really care whether
>> the
>>>>>>> cache
>>>>>>>>> is used or not. So I think the most intuitive way is letting
>> cache()
>>>>>>> return
>>>>>>>>> nothing. So nobody needs to worry about the difference between
>>>>>>> operations
>>>>>>>>> on CacheTables and those on the "original" tables. This will make
>>>>> maybe
>>>>>>>>> 99.9% of the users happy. There were two concerns raised for this
>>>>>>> approach:
>>>>>>>>> 1. In some rare cases, users may want to ignore cache,
>>>>>>>>> 2. A table might be cached/uncached in a third party function while
>>>>> the
>>>>>>>>> caller does not know.
>>>>>>>>> 
>>>>>>>>> For the first issue, users can use hint("ignoreCache") to
>> explicitly
>>>>>>> ignore
>>>>>>>>> cache.
>>>>>>>>> For the second issue, the above proposal lets cache() return a
>>>>>>> CacheHandle,
>>>>>>>>> the only method in it is release(). Different CacheHandles will
>>>>> refer to
>>>>>>>>> the same cache, if a cache no longer has any cache handle, it will
>> be
>>>>>>>>> deleted. This will address the following case:
>>>>>>>>> {
>>>>>>>>> val handle1 = a.cache()
>>>>>>>>> process(a)
>>>>>>>>> a.select(...) // cache is still available, handle1 has not been
>>>>>>> released.
>>>>>>>>> }
>>>>>>>>> 
>>>>>>>>> void process(Table t) {
>>>>>>>>> val handle2 = t.cache() // new handle to cache
>>>>>>>>> t.select(...) // optimizer decides cache usage
>>>>>>>>> t.hint("ignoreCache").select(...) // cache is ignored
>>>>>>>>> handle2.release() // release the handle, but the cache may still be
>>>>>>>>> available if there are other handles
>>>>>>>>> ...
>>>>>>>>> }
>>>>>>>>> 
>>>>>>>>> Does the above modified approach look reasonable to you?
>>>>>>>>> 
>>>>>>>>> Cheers,
>>>>>>>>> 
>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Tue, Dec 11, 2018 at 6:44 PM Till Rohrmann <
>> trohrm...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi Becket,
>>>>>>>>>> 
>>>>>>>>>> I was aiming at semantics similar to 1. I actually thought that
>>>>>>> `cache()`
>>>>>>>>>> would tell the system to materialize the intermediate result so
>> that
>>>>>>>>>> subsequent queries don't need to reprocess it. This means that the
>>>>>>> usage
>>>>>>>>> of
>>>>>>>>>> the cached table in this example
>>>>>>>>>> 
>>>>>>>>>> {
>>>>>>>>>> val cachedTable = a.cache()
>>>>>>>>>> val b1 = cachedTable.select(…)
>>>>>>>>>> val b2 = cachedTable.foo().select(…)
>>>>>>>>>> val b3 = cachedTable.bar().select(...)
>>>>>>>>>> val c1 = a.select(…)
>>>>>>>>>> val c2 = a.foo().select(…)
>>>>>>>>>> val c3 = a.bar().select(...)
>>>>>>>>>> }
>>>>>>>>>> 
>>>>>>>>>> strongly depends on interleaved calls which trigger the execution
>> of
>>>>>>> sub
>>>>>>>>>> queries. So for example, if there is only a single env.execute
>> call
>>>>> at
>>>>>>>>> the
>>>>>>>>>> end of  block, then b1, b2, b3, c1, c2 and c3 would all be
>> computed
>>>>> by
>>>>>>>>>> reading directly from the sources (given that there is only a
>> single
>>>>>>>>>> JobGraph). It just happens that the result of `a` will be cached
>>>>> such
>>>>>>>>> that
>>>>>>>>>> we skip the processing of `a` when there are subsequent queries
>>>>> reading
>>>>>>>>>> from `cachedTable`. If for some reason the system cannot
>> materialize
>>>>>>> the
>>>>>>>>>> table (e.g. running out of disk space, ttl expired), then it could
>>>>> also
>>>>>>>>>> happen that we need to reprocess `a`. In that sense `cachedTable`
>>>>>>> simply
>>>>>>>>> is
>>>>>>>>>> an identifier for the materialized result of `a` with the lineage
>>>>> how
>>>>>>> to
>>>>>>>>>> reprocess it.
>>>>>>>>>> 
>>>>>>>>>> Cheers,
>>>>>>>>>> Till
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Tue, Dec 11, 2018 at 11:01 AM Piotr Nowojski <
>>>>>>> pi...@data-artisans.com
>>>>>>>>>> 
>>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Hi Becket,
>>>>>>>>>>> 
>>>>>>>>>>>> {
>>>>>>>>>>>> val cachedTable = a.cache()
>>>>>>>>>>>> val b = cachedTable.select(...)
>>>>>>>>>>>> val c = a.select(...)
>>>>>>>>>>>> }
>>>>>>>>>>>> 
>>>>>>>>>>>> Semantic 1. b uses cachedTable as user demanded so. c uses
>>>>> original
>>>>>>>>> DAG
>>>>>>>>>>> as
>>>>>>>>>>>> user demanded so. In this case, the optimizer has no chance to
>>>>>>>>>> optimize.
>>>>>>>>>>>> Semantic 2. b uses cachedTable as user demanded so. c leaves the
>>>>>>>>>>> optimizer
>>>>>>>>>>>> to choose whether the cache or DAG should be used. In this case,
>>>>> user
>>>>>>>>>>> lose
>>>>>>>>>>>> the option to NOT use cache.
>>>>>>>>>>>> 
>>>>>>>>>>>> As you can see, neither of the options seem perfect. However, I
>>>>> guess
>>>>>>>>>> you
>>>>>>>>>>>> and Till are proposing the third option:
>>>>>>>>>>>> 
>>>>>>>>>>>> Semantic 3. b leaves the optimizer to choose whether cache or
>> DAG
>>>>>>>>>> should
>>>>>>>>>>> be
>>>>>>>>>>>> used. c always use the DAG.
>>>>>>>>>>> 
>>>>>>>>>>> I am pretty sure that me, Till, Fabian and others were all
>>>>> proposing
>>>>>>>>> and
>>>>>>>>>>> advocating in favour of semantic “1”. No cost based optimiser
>>>>>>> decisions
>>>>>>>>>> at
>>>>>>>>>>> all.
>>>>>>>>>>> 
>>>>>>>>>>> {
>>>>>>>>>>> val cachedTable = a.cache()
>>>>>>>>>>> val b1 = cachedTable.select(…)
>>>>>>>>>>> val b2 = cachedTable.foo().select(…)
>>>>>>>>>>> val b3 = cachedTable.bar().select(...)
>>>>>>>>>>> val c1 = a.select(…)
>>>>>>>>>>> val c2 = a.foo().select(…)
>>>>>>>>>>> val c3 = a.bar().select(...)
>>>>>>>>>>> }
>>>>>>>>>>> 
>>>>>>>>>>> All b1, b2 and b3 are reading from cache, while c1, c2 and c3 are
>>>>>>>>>>> re-executing whole plan for “a”.
>>>>>>>>>>> 
>>>>>>>>>>> In the future we could discuss going one step further,
>> introducing
>>>>>>> some
>>>>>>>>>>> global optimisation (that can be manually enabled/disabled):
>>>>>>>>> deduplicate
>>>>>>>>>>> plan nodes/deduplicate sub queries/re-use sub queries results/or
>>>>>>>>> whatever
>>>>>>>>>>> we could call it. It could do two things:
>>>>>>>>>>> 
>>>>>>>>>>> 1. Automatically try to deduplicate fragments of the plan and
>> share
>>>>>>> the
>>>>>>>>>>> result using CachedTable - in other words automatically insert
>>>>>>>>>> `CachedTable
>>>>>>>>>>> cache()` calls.
>>>>>>>>>>> 2. Automatically make decision to bypass explicit `CachedTable`
>>>>> access
>>>>>>>>>>> (this would be the equivalent of what you described as “semantic
>>>>> 3”).
>>>>>>>>>>> 
>>>>>>>>>>> However as I wrote previously, I have big doubts if such
>> cost-based
>>>>>>>>>>> optimisation would work (this applies also to “Semantic 2”). I
>>>>> would
>>>>>>>>>> expect
>>>>>>>>>>> it to do more harm than good in so many cases, that it wouldn’t
>>>>> make
>>>>>>>>>> sense.
>>>>>>>>>>> Even assuming that we calculate statistics perfectly (this ain’t
>>>>> gonna
>>>>>>>>>>> happen), it’s virtually impossible to correctly estimate correct
>>>>>>>>> exchange
>>>>>>>>>>> rate of CPU cycles vs IO operations as it is changing so much
>> from
>>>>>>>>>>> deployment to deployment.
>>>>>>>>>>> 
>>>>>>>>>>> Is this the core of our disagreement here? That you would like
>> this
>>>>>>>>>>> “cache()” to be mostly hint for the optimiser?
>>>>>>>>>>> 
>>>>>>>>>>> Piotrek
>>>>>>>>>>> 
>>>>>>>>>>>> On 11 Dec 2018, at 06:00, Becket Qin <becket....@gmail.com>
>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>> Another potential concern for semantic 3 is that. In the future,
>>>>> we
>>>>>>>>> may
>>>>>>>>>>> add
>>>>>>>>>>>> automatic caching to Flink. e.g. cache the intermediate results
>> at
>>>>>>>>> the
>>>>>>>>>>>> shuffle boundary. If our semantic is that reference to the
>>>>> original
>>>>>>>>>> table
>>>>>>>>>>>> means skipping cache, those users may not be able to benefit
>> from
>>>>> the
>>>>>>>>>>>> implicit cache.
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On Tue, Dec 11, 2018 at 12:10 PM Becket Qin <
>> becket....@gmail.com
>>>>>> 
>>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi Piotrek,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks for the reply. Thought about it again, I might have
>>>>>>>>>> misunderstood
>>>>>>>>>>>>> your proposal in earlier emails. Returning a CachedTable might
>>>>> not
>>>>>>>>> be
>>>>>>>>>> a
>>>>>>>>>>> bad
>>>>>>>>>>>>> idea.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I was more concerned about the semantic and its intuitiveness
>>>>> when a
>>>>>>>>>>>>> CachedTable is returned. i..e, if cache() returns CachedTable.
>>>>> What
>>>>>>>>>> are
>>>>>>>>>>> the
>>>>>>>>>>>>> semantic in the following code:
>>>>>>>>>>>>> {
>>>>>>>>>>>>> val cachedTable = a.cache()
>>>>>>>>>>>>> val b = cachedTable.select(...)
>>>>>>>>>>>>> val c = a.select(...)
>>>>>>>>>>>>> }
>>>>>>>>>>>>> What is the difference between b and c? At the first glance, I
>>>>> see
>>>>>>>>> two
>>>>>>>>>>>>> options:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Semantic 1. b uses cachedTable as user demanded so. c uses
>>>>> original
>>>>>>>>>> DAG
>>>>>>>>>>> as
>>>>>>>>>>>>> user demanded so. In this case, the optimizer has no chance to
>>>>>>>>>> optimize.
>>>>>>>>>>>>> Semantic 2. b uses cachedTable as user demanded so. c leaves
>> the
>>>>>>>>>>> optimizer
>>>>>>>>>>>>> to choose whether the cache or DAG should be used. In this
>> case,
>>>>>>>>> user
>>>>>>>>>>> lose
>>>>>>>>>>>>> the option to NOT use cache.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> As you can see, neither of the options seem perfect. However, I
>>>>>>>>> guess
>>>>>>>>>>> you
>>>>>>>>>>>>> and Till are proposing the third option:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Semantic 3. b leaves the optimizer to choose whether cache or
>> DAG
>>>>>>>>>> should
>>>>>>>>>>>>> be used. c always use the DAG.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> This does address all the concerns. It is just that from
>>>>>>>>> intuitiveness
>>>>>>>>>>>>> perspective, I found that asking user to explicitly use a
>>>>>>>>> CachedTable
>>>>>>>>>>> while
>>>>>>>>>>>>> the optimizer might choose to ignore is a little weird. That
>> was
>>>>>>>>> why I
>>>>>>>>>>> did
>>>>>>>>>>>>> not think about that semantic. But given there is material
>>>>> benefit,
>>>>>>>>> I
>>>>>>>>>>> think
>>>>>>>>>>>>> this semantic is acceptable.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 1. If we want to let optimiser make decisions whether to use
>>>>> cache
>>>>>>>>> or
>>>>>>>>>>> not,
>>>>>>>>>>>>>> then why do we need “void cache()” method at all? Would It
>>>>>>>>>> “increase”
>>>>>>>>>>> the
>>>>>>>>>>>>>> chance of using the cache? That’s sounds strange. What would
>> be
>>>>> the
>>>>>>>>>>>>>> mechanism of deciding whether to use the cache or not? If we
>>>>> want
>>>>>>>>> to
>>>>>>>>>>>>>> introduce such kind  automated optimisations of “plan nodes
>>>>>>>>>>> deduplication”
>>>>>>>>>>>>>> I would turn it on globally, not per table, and let the
>>>>> optimiser
>>>>>>>>> do
>>>>>>>>>>> all of
>>>>>>>>>>>>>> the work.
>>>>>>>>>>>>>> 2. We do not have statistics at the moment for any use/not use
>>>>>>>>> cache
>>>>>>>>>>>>>> decision.
>>>>>>>>>>>>>> 3. Even if we had, I would be veeerryy sceptical whether such
>>>>> cost
>>>>>>>>>>> based
>>>>>>>>>>>>>> optimisations would work properly and I would still insist
>>>>> first on
>>>>>>>>>>>>>> providing explicit caching mechanism (`CachedTable cache()`)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> We are absolutely on the same page here. An explicit cache()
>>>>> method
>>>>>>>>> is
>>>>>>>>>>>>> necessary not only because optimizer may not be able to make
>> the
>>>>>>>>> right
>>>>>>>>>>>>> decision, but also because of the nature of interactive
>>>>> programming.
>>>>>>>>>> For
>>>>>>>>>>>>> example, if users write the following code in Scala shell:
>>>>>>>>>>>>> val b = a.select(...)
>>>>>>>>>>>>> val c = b.select(...)
>>>>>>>>>>>>> val d = c.select(...).writeToSink(...)
>>>>>>>>>>>>> tEnv.execute()
>>>>>>>>>>>>> There is no way optimizer will know whether b or c will be used
>>>>> in
>>>>>>>>>> later
>>>>>>>>>>>>> code, unless users hint explicitly.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> At the same time I’m not sure if you have responded to our
>>>>>>>>> objections
>>>>>>>>>> of
>>>>>>>>>>>>>> `void cache()` being implicit/having side effects, which me,
>>>>> Jark,
>>>>>>>>>>> Fabian,
>>>>>>>>>>>>>> Till and I think also Shaoxuan are supporting.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Is there any other side effects if we use semantic 3 mentioned
>>>>>>>>> above?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> JIangjie (Becket) Qin
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Mon, Dec 10, 2018 at 7:54 PM Piotr Nowojski <
>>>>>>>>>> pi...@data-artisans.com
>>>>>>>>>>>> 
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi Becket,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Sorry for not responding long time.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Regarding case1.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> There wouldn’t be no “a.unCache()” method, but I would expect
>>>>> only
>>>>>>>>>>>>>> `cachedTableA1.dropCache()`. Dropping `cachedTableA1` wouldn’t
>>>>>>>>> affect
>>>>>>>>>>>>>> `cachedTableA2`. Just as in any other database dropping
>>>>> modifying
>>>>>>>>> one
>>>>>>>>>>>>>> independent table/materialised view does not affect others.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> What I meant is that assuming there is already a cached
>> table,
>>>>>>>>>> ideally
>>>>>>>>>>>>>> users need
>>>>>>>>>>>>>>> not to specify whether the next query should read from the
>>>>> cache
>>>>>>>>> or
>>>>>>>>>>> use
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> original DAG. This should be decided by the optimizer.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 1. If we want to let optimiser make decisions whether to use
>>>>> cache
>>>>>>>>> or
>>>>>>>>>>>>>> not, then why do we need “void cache()” method at all? Would
>> It
>>>>>>>>>>> “increase”
>>>>>>>>>>>>>> the chance of using the cache? That’s sounds strange. What
>>>>> would be
>>>>>>>>>> the
>>>>>>>>>>>>>> mechanism of deciding whether to use the cache or not? If we
>>>>> want
>>>>>>>>> to
>>>>>>>>>>>>>> introduce such kind  automated optimisations of “plan nodes
>>>>>>>>>>> deduplication”
>>>>>>>>>>>>>> I would turn it on globally, not per table, and let the
>>>>> optimiser
>>>>>>>>> do
>>>>>>>>>>> all of
>>>>>>>>>>>>>> the work.
>>>>>>>>>>>>>> 2. We do not have statistics at the moment for any use/not use
>>>>>>>>> cache
>>>>>>>>>>>>>> decision.
>>>>>>>>>>>>>> 3. Even if we had, I would be veeerryy sceptical whether such
>>>>> cost
>>>>>>>>>>> based
>>>>>>>>>>>>>> optimisations would work properly and I would still insist
>>>>> first on
>>>>>>>>>>>>>> providing explicit caching mechanism (`CachedTable cache()`)
>>>>>>>>>>>>>> 4. As Till wrote, having explicit `CachedTable cache()`
>> doesn’t
>>>>>>>>>>>>>> contradict future work on automated cost based caching.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> At the same time I’m not sure if you have responded to our
>>>>>>>>> objections
>>>>>>>>>>> of
>>>>>>>>>>>>>> `void cache()` being implicit/having side effects, which me,
>>>>> Jark,
>>>>>>>>>>> Fabian,
>>>>>>>>>>>>>> Till and I think also Shaoxuan are supporting.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Piotrek
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On 5 Dec 2018, at 12:42, Becket Qin <becket....@gmail.com>
>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi Till,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> It is true that after the first job submission, there will be
>>>>> no
>>>>>>>>>>>>>> ambiguity
>>>>>>>>>>>>>>> in terms of whether a cached table is used or not. That is
>> the
>>>>>>>>> same
>>>>>>>>>>> for
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> cache() without returning a CachedTable.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Conceptually one could think of cache() as introducing a
>>>>> caching
>>>>>>>>>>>>>> operator
>>>>>>>>>>>>>>>> from which you need to consume from if you want to benefit
>>>>> from
>>>>>>>>> the
>>>>>>>>>>>>>> caching
>>>>>>>>>>>>>>>> functionality.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I am thinking a little differently. I think it is a hint (as
>>>>> you
>>>>>>>>>>>>>> mentioned
>>>>>>>>>>>>>>> later) instead of a new operator. I'd like to be careful
>> about
>>>>> the
>>>>>>>>>>>>>> semantic
>>>>>>>>>>>>>>> of the API. A hint is a property set on an existing operator,
>>>>> but
>>>>>>>>> is
>>>>>>>>>>> not
>>>>>>>>>>>>>>> itself an operator as it does not really manipulate the data.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I agree, ideally the optimizer makes this kind of decision
>>>>> which
>>>>>>>>>>>>>>>> intermediate result should be cached. But especially when
>>>>>>>>> executing
>>>>>>>>>>>>>> ad-hoc
>>>>>>>>>>>>>>>> queries the user might better know which results need to be
>>>>>>>>> cached
>>>>>>>>>>>>>> because
>>>>>>>>>>>>>>>> Flink might not see the full DAG. In that sense, I would
>>>>> consider
>>>>>>>>>> the
>>>>>>>>>>>>>>>> cache() method as a hint for the optimizer. Of course, in
>> the
>>>>>>>>>> future
>>>>>>>>>>> we
>>>>>>>>>>>>>>>> might add functionality which tries to automatically cache
>>>>>>>>> results
>>>>>>>>>>>>>> (e.g.
>>>>>>>>>>>>>>>> caching the latest intermediate results until so and so much
>>>>>>>>> space
>>>>>>>>>> is
>>>>>>>>>>>>>>>> used). But this should hopefully not contradict with
>>>>> `CachedTable
>>>>>>>>>>>>>> cache()`.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I agree that cache() method is needed for exactly the reason
>>>>> you
>>>>>>>>>>>>>> mentioned,
>>>>>>>>>>>>>>> i.e. Flink cannot predict what users are going to write
>> later,
>>>>> so
>>>>>>>>>>> users
>>>>>>>>>>>>>>> need to tell Flink explicitly that this table will be used
>>>>> later.
>>>>>>>>>>> What I
>>>>>>>>>>>>>>> meant is that assuming there is already a cached table,
>> ideally
>>>>>>>>>> users
>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>> not to specify whether the next query should read from the
>>>>> cache
>>>>>>>>> or
>>>>>>>>>>> use
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> original DAG. This should be decided by the optimizer.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> To explain the difference between returning / not returning a
>>>>>>>>>>>>>> CachedTable,
>>>>>>>>>>>>>>> I want compare the following two case:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> *Case 1:  returning a CachedTable*
>>>>>>>>>>>>>>> b = a.map(...)
>>>>>>>>>>>>>>> val cachedTableA1 = a.cache()
>>>>>>>>>>>>>>> val cachedTableA2 = a.cache()
>>>>>>>>>>>>>>> b.print() // Just to make sure a is cached.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> c = a.filter(...) // User specify that the original DAG is
>>>>> used?
>>>>>>>>> Or
>>>>>>>>>>> the
>>>>>>>>>>>>>>> optimizer decides whether DAG or cache should be used?
>>>>>>>>>>>>>>> d = cachedTableA1.filter() // User specify that the cached
>>>>> table
>>>>>>>>> is
>>>>>>>>>>>>>> used.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> a.unCache() // Can cachedTableA still be used afterwards?
>>>>>>>>>>>>>>> cachedTableA1.uncache() // Can cachedTableA2 still be used?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> *Case 2: not returning a CachedTable*
>>>>>>>>>>>>>>> b = a.map()
>>>>>>>>>>>>>>> a.cache()
>>>>>>>>>>>>>>> a.cache() // no-op
>>>>>>>>>>>>>>> b.print() // Just to make sure a is cached
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> c = a.filter(...) // Optimizer decides whether the cache or
>> DAG
>>>>>>>>>> should
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>> used
>>>>>>>>>>>>>>> d = a.filter(...) // Optimizer decides whether the cache or
>> DAG
>>>>>>>>>> should
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>> used
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> a.unCache()
>>>>>>>>>>>>>>> a.unCache() // no-op
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> In case 1, semantic wise, optimizer lose the option to choose
>>>>>>>>>> between
>>>>>>>>>>>>>> DAG
>>>>>>>>>>>>>>> and cache. And the unCache() call becomes tricky.
>>>>>>>>>>>>>>> In case 2, users do not need to worry about whether cache or
>>>>> DAG
>>>>>>>>> is
>>>>>>>>>>>>>> used.
>>>>>>>>>>>>>>> And the unCache() semantic is clear. However, the caveat is
>>>>> that
>>>>>>>>>> users
>>>>>>>>>>>>>>> cannot explicitly ignore the cache.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> In order to address the issues mentioned in case 2 and
>>>>> inspired by
>>>>>>>>>> the
>>>>>>>>>>>>>>> discussion so far, I am thinking about using hint to allow
>> user
>>>>>>>>>>>>>> explicitly
>>>>>>>>>>>>>>> ignore cache. Although we do not have hint yet, but we
>> probably
>>>>>>>>>> should
>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>> one. So the code becomes:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> *Case 3: returning this table*
>>>>>>>>>>>>>>> b = a.map()
>>>>>>>>>>>>>>> a.cache()
>>>>>>>>>>>>>>> a.cache() // no-op
>>>>>>>>>>>>>>> b.print() // Just to make sure a is cached
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> c = a.filter(...) // Optimizer decides whether the cache or
>> DAG
>>>>>>>>>> should
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>> used
>>>>>>>>>>>>>>> d = a.hint("ignoreCache").filter(...) // DAG will be used
>>>>> instead
>>>>>>>>> of
>>>>>>>>>>> the
>>>>>>>>>>>>>>> cache.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> a.unCache()
>>>>>>>>>>>>>>> a.unCache() // no-op
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> We could also let cache() return this table to allow chained
>>>>>>>>> method
>>>>>>>>>>>>>> calls.
>>>>>>>>>>>>>>> Do you think this API addresses the concerns?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Wed, Dec 5, 2018 at 10:55 AM Jark Wu <imj...@gmail.com>
>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> All the recent discussions are focused on whether there is a
>>>>>>>>>> problem
>>>>>>>>>>> if
>>>>>>>>>>>>>>>> cache() not return a Table.
>>>>>>>>>>>>>>>> It seems that returning a Table explicitly is more clear
>> (and
>>>>>>>>>> safe?).
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> So whether there are any problems if cache() returns a
>> Table?
>>>>>>>>>>> @Becket
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>> Jark
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On Tue, 4 Dec 2018 at 22:27, Till Rohrmann <
>>>>> trohrm...@apache.org
>>>>>>>>>> 
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> It's true that b, c, d and e will all read from the
>> original
>>>>> DAG
>>>>>>>>>>> that
>>>>>>>>>>>>>>>>> generates a. But all subsequent operators (when running
>>>>> multiple
>>>>>>>>>>>>>> queries)
>>>>>>>>>>>>>>>>> which reference cachedTableA should not need to reproduce
>> `a`
>>>>>>>>> but
>>>>>>>>>>>>>>>> directly
>>>>>>>>>>>>>>>>> consume the intermediate result.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Conceptually one could think of cache() as introducing a
>>>>> caching
>>>>>>>>>>>>>> operator
>>>>>>>>>>>>>>>>> from which you need to consume from if you want to benefit
>>>>> from
>>>>>>>>>> the
>>>>>>>>>>>>>>>> caching
>>>>>>>>>>>>>>>>> functionality.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> I agree, ideally the optimizer makes this kind of decision
>>>>> which
>>>>>>>>>>>>>>>>> intermediate result should be cached. But especially when
>>>>>>>>>> executing
>>>>>>>>>>>>>>>> ad-hoc
>>>>>>>>>>>>>>>>> queries the user might better know which results need to be
>>>>>>>>> cached
>>>>>>>>>>>>>>>> because
>>>>>>>>>>>>>>>>> Flink might not see the full DAG. In that sense, I would
>>>>>>>>> consider
>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> cache() method as a hint for the optimizer. Of course, in
>> the
>>>>>>>>>> future
>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>> might add functionality which tries to automatically cache
>>>>>>>>> results
>>>>>>>>>>>>>> (e.g.
>>>>>>>>>>>>>>>>> caching the latest intermediate results until so and so
>> much
>>>>>>>>> space
>>>>>>>>>>> is
>>>>>>>>>>>>>>>>> used). But this should hopefully not contradict with
>>>>>>>>> `CachedTable
>>>>>>>>>>>>>>>> cache()`.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>> Till
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On Tue, Dec 4, 2018 at 2:33 PM Becket Qin <
>>>>> becket....@gmail.com
>>>>>>>>>> 
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Hi Till,
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Thanks for the clarification. I am still a little
>> confused.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> If cache() returns a CachedTable, the example might
>> become:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> b = a.map(...)
>>>>>>>>>>>>>>>>>> c = a.map(...)
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> cachedTableA = a.cache()
>>>>>>>>>>>>>>>>>> d = cachedTableA.map(...)
>>>>>>>>>>>>>>>>>> e = a.map()
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> In the above case, if cache() is lazily evaluated, b, c, d
>>>>> and
>>>>>>>>> e
>>>>>>>>>>> are
>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>> going to be reading from the original DAG that generates
>> a.
>>>>> But
>>>>>>>>>>> with
>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>> naive expectation, d should be reading from the cache.
>> This
>>>>>>>>> seems
>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>> solving the potential confusion you raised, right?
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Just to be clear, my understanding are all based on the
>>>>>>>>>> assumption
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> tables are immutable. Therefore, after a.cache(), a the
>>>>>>>>>>>>>> c*achedTableA*
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>> original table *a * should be completely interchangeable.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> That said, I think a valid argument is optimization. There
>>>>> are
>>>>>>>>>>> indeed
>>>>>>>>>>>>>>>>> cases
>>>>>>>>>>>>>>>>>> that reading from the original DAG could be faster than
>>>>> reading
>>>>>>>>>>> from
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> cache. For example, in the following example:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> a.filter(f1' > 100)
>>>>>>>>>>>>>>>>>> a.cache()
>>>>>>>>>>>>>>>>>> b = a.filter(f1' < 100)
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Ideally the optimizer should be intelligent enough to
>> decide
>>>>>>>>>> which
>>>>>>>>>>>>>> way
>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>> faster, without user intervention. In this case, it will
>>>>>>>>> identify
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>> b
>>>>>>>>>>>>>>>>>> would just be an empty table, thus skip reading from the
>>>>> cache
>>>>>>>>>>>>>>>>> completely.
>>>>>>>>>>>>>>>>>> But I agree that returning a CachedTable would give user
>> the
>>>>>>>>>>> control
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>> when to use cache, even though I still feel that letting
>> the
>>>>>>>>>>>>>> optimizer
>>>>>>>>>>>>>>>>>> handle this is a better option in long run.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> On Tue, Dec 4, 2018 at 6:51 PM Till Rohrmann <
>>>>>>>>>> trohrm...@apache.org
>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Yes you are right Becket that it still depends on the
>>>>> actual
>>>>>>>>>>>>>>>> execution
>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>> the job whether a consumer reads from a cached result or
>>>>> not.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> My point was actually about the properties of a (cached
>> vs.
>>>>>>>>>>>>>>>> non-cached)
>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> not about the execution. I would not make cache trigger
>> the
>>>>>>>>>>>>>> execution
>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>> the job because one loses some flexibility by eagerly
>>>>>>>>> triggering
>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> execution.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> I tried to argue for an explicit CachedTable which is
>>>>> returned
>>>>>>>>>> by
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> cache() method like Piotr did in order to make the API
>> more
>>>>>>>>>>>>>> explicit.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>> Till
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 4:23 PM Becket Qin <
>>>>>>>>> becket....@gmail.com
>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Hi Till,
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> That is a good example. Just a minor correction, in this
>>>>>>>>> case,
>>>>>>>>>>> b, c
>>>>>>>>>>>>>>>>>> and d
>>>>>>>>>>>>>>>>>>>> will all consume from a non-cached a. This is because
>>>>> cache
>>>>>>>>>> will
>>>>>>>>>>>>>>>> only
>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>> created on the very first job submission that generates
>>>>> the
>>>>>>>>>> table
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>> cached.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> If I understand correctly, this is example is about
>>>>> whether
>>>>>>>>>>>>>>>> .cache()
>>>>>>>>>>>>>>>>>>> method
>>>>>>>>>>>>>>>>>>>> should be eagerly evaluated or lazily evaluated. In
>>>>> another
>>>>>>>>>> word,
>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>> cache() method actually triggers a job that creates the
>>>>>>>>> cache,
>>>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>> be no such confusion. Is that right?
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> In the example, although d will not consume from the
>>>>> cached
>>>>>>>>>> Table
>>>>>>>>>>>>>>>>> while
>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>> looks supposed to, from correctness perspective the code
>>>>> will
>>>>>>>>>>> still
>>>>>>>>>>>>>>>>>>> return
>>>>>>>>>>>>>>>>>>>> correct result, assuming that tables are immutable.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Personally I feel it is OK because users probably won't
>>>>>>>>> really
>>>>>>>>>>>>>>>> worry
>>>>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>>> whether the table is cached or not. And lazy cache could
>>>>>>>>> avoid
>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>> unnecessary caching if a cached table is never created
>> in
>>>>> the
>>>>>>>>>>> user
>>>>>>>>>>>>>>>>>>>> application. But I am not opposed to do eager evaluation
>>>>> of
>>>>>>>>>>> cache.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 10:01 PM Till Rohrmann <
>>>>>>>>>>>>>>>> trohrm...@apache.org>
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Another argument for Piotr's point is that lazily
>>>>> changing
>>>>>>>>>>>>>>>>> properties
>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>> node affects all down stream consumers but does not
>>>>>>>>>> necessarily
>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>> happen before these consumers are defined. From a
>> user's
>>>>>>>>>>>>>>>>> perspective
>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>> can be quite confusing:
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> b = a.map(...)
>>>>>>>>>>>>>>>>>>>>> c = a.map(...)
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> a.cache()
>>>>>>>>>>>>>>>>>>>>> d = a.map(...)
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> now b, c and d will consume from a cached operator. In
>>>>> this
>>>>>>>>>>> case,
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> user
>>>>>>>>>>>>>>>>>>>>> would most likely expect that only d reads from a
>> cached
>>>>>>>>>> result.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>> Till
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 11:32 AM Piotr Nowojski <
>>>>>>>>>>>>>>>>>>> pi...@data-artisans.com>
>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Hey Shaoxuan and Becket,
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Can you explain a bit more one what are the side
>>>>> effects?
>>>>>>>>> So
>>>>>>>>>>>>>>>>> far
>>>>>>>>>>>>>>>>>> my
>>>>>>>>>>>>>>>>>>>>>>> understanding is that such side effects only exist
>> if a
>>>>>>>>>> table
>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>> mutable.
>>>>>>>>>>>>>>>>>>>>>>> Is that the case?
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Not only that. There are also performance implications
>>>>> and
>>>>>>>>>>>>>>>> those
>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>> another implicit side effects of using `void cache()`.
>>>>> As I
>>>>>>>>>>>>>>>> wrote
>>>>>>>>>>>>>>>>>>>> before,
>>>>>>>>>>>>>>>>>>>>>> reading from cache might not always be desirable, thus
>>>>> it
>>>>>>>>> can
>>>>>>>>>>>>>>>>> cause
>>>>>>>>>>>>>>>>>>>>>> performance degradation and I’m fine with that -
>> user's
>>>>> or
>>>>>>>>>>>>>>>>>>> optimiser’s
>>>>>>>>>>>>>>>>>>>>>> choice. What I do not like is that this implicit side
>>>>>>>>> effect
>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>> manifest
>>>>>>>>>>>>>>>>>>>>>> in completely different part of code, that wasn’t
>>>>> touched
>>>>>>>>> by
>>>>>>>>>> a
>>>>>>>>>>>>>>>>> user
>>>>>>>>>>>>>>>>>>>> while
>>>>>>>>>>>>>>>>>>>>>> he was adding `void cache()` call somewhere else. And
>>>>> even
>>>>>>>>> if
>>>>>>>>>>>>>>>>>> caching
>>>>>>>>>>>>>>>>>>>>>> improves performance, it’s still a side effect of
>> `void
>>>>>>>>>>>>>>>> cache()`.
>>>>>>>>>>>>>>>>>>>> Almost
>>>>>>>>>>>>>>>>>>>>>> from the definition `void` methods have only side
>>>>> effects.
>>>>>>>>>> As I
>>>>>>>>>>>>>>>>>> wrote
>>>>>>>>>>>>>>>>>>>>>> before, there are couple of scenarios where this might
>>>>> be
>>>>>>>>>>>>>>>>>> undesirable
>>>>>>>>>>>>>>>>>>>>>> and/or unexpected, for example:
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 1.
>>>>>>>>>>>>>>>>>>>>>> Table b = …;
>>>>>>>>>>>>>>>>>>>>>> b.cache()
>>>>>>>>>>>>>>>>>>>>>> x = b.join(…)
>>>>>>>>>>>>>>>>>>>>>> y = b.count()
>>>>>>>>>>>>>>>>>>>>>> // ...
>>>>>>>>>>>>>>>>>>>>>> // 100
>>>>>>>>>>>>>>>>>>>>>> // hundred
>>>>>>>>>>>>>>>>>>>>>> // lines
>>>>>>>>>>>>>>>>>>>>>> // of
>>>>>>>>>>>>>>>>>>>>>> // code
>>>>>>>>>>>>>>>>>>>>>> // later
>>>>>>>>>>>>>>>>>>>>>> z = b.filter(…).groupBy(…) // this might be even
>> hidden
>>>>> in
>>>>>>>>> a
>>>>>>>>>>>>>>>>>>> different
>>>>>>>>>>>>>>>>>>>>>> method/file/package/dependency
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 2.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Table b = ...
>>>>>>>>>>>>>>>>>>>>>> If (some_condition) {
>>>>>>>>>>>>>>>>>>>>>> foo(b)
>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>> Else {
>>>>>>>>>>>>>>>>>>>>>> bar(b)
>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>> z = b.filter(…).groupBy(…)
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Void foo(Table b) {
>>>>>>>>>>>>>>>>>>>>>> b.cache()
>>>>>>>>>>>>>>>>>>>>>> // do something with b
>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> In both above examples, `b.cache()` will implicitly
>>>>> affect
>>>>>>>>>>>>>>>>>> (semantic
>>>>>>>>>>>>>>>>>>>> of a
>>>>>>>>>>>>>>>>>>>>>> program in case of sources being mutable and
>>>>> performance)
>>>>>>>>> `z
>>>>>>>>>> =
>>>>>>>>>>>>>>>>>>>>>> b.filter(…).groupBy(…)` which might be far from
>> obvious.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> On top of that, there is still this argument of mine
>>>>> that
>>>>>>>>>>>>>>>> having
>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>> `MaterializedTable` or `CachedTable` handle is more
>>>>>>>>> flexible
>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>> us
>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> future and for the user (as a manual option to bypass
>>>>> cache
>>>>>>>>>>>>>>>>> reads).
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> But Jiangjie is correct,
>>>>>>>>>>>>>>>>>>>>>>> the source table in batching should be immutable. It
>> is
>>>>>>>>> the
>>>>>>>>>>>>>>>>>> user’s
>>>>>>>>>>>>>>>>>>>>>>> responsibility to ensure it, otherwise even a regular
>>>>>>>>>>>>>>>> failover
>>>>>>>>>>>>>>>>>> may
>>>>>>>>>>>>>>>>>>>> lead
>>>>>>>>>>>>>>>>>>>>>>> to inconsistent results.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Yes, I agree that’s what perfect world/good deployment
>>>>>>>>> should
>>>>>>>>>>>>>>>> be.
>>>>>>>>>>>>>>>>>> But
>>>>>>>>>>>>>>>>>>>> its
>>>>>>>>>>>>>>>>>>>>>> often isn’t and while I’m not trying to fix this
>> (since
>>>>> the
>>>>>>>>>>>>>>>>> proper
>>>>>>>>>>>>>>>>>>> fix
>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>> to support transactions), I’m just trying to minimise
>>>>>>>>>> confusion
>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> users that are not fully aware what’s going on and
>>>>> operate
>>>>>>>>> in
>>>>>>>>>>>>>>>>> less
>>>>>>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>>>>>>> perfect setup. And if something bites them after
>> adding
>>>>>>>>>>>>>>>>> `b.cache()`
>>>>>>>>>>>>>>>>>>>> call,
>>>>>>>>>>>>>>>>>>>>>> to make sure that they at least know all of the places
>>>>> that
>>>>>>>>>>>>>>>>> adding
>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>> line can affect.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Thanks, Piotrek
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> On 1 Dec 2018, at 15:39, Becket Qin <
>>>>> becket....@gmail.com
>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Hi Piotrek,
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Thanks again for the clarification. Some more replies
>>>>> are
>>>>>>>>>>>>>>>>>>> following.
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> But keep in mind that `.cache()` will/might not only
>> be
>>>>>>>>> used
>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>> interactive
>>>>>>>>>>>>>>>>>>>>>>>> programming and not only in batching.
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> It is true. Actually in stream processing, cache()
>> has
>>>>> the
>>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>>>>> semantic
>>>>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>> batch processing. The semantic is following:
>>>>>>>>>>>>>>>>>>>>>>> For a table created via a series of computation, save
>>>>> that
>>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>> later
>>>>>>>>>>>>>>>>>>>>>>> reference to avoid running the computation logic to
>>>>>>>>>>>>>>>> regenerate
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> table.
>>>>>>>>>>>>>>>>>>>>>>> Once the application exits, drop all the cache.
>>>>>>>>>>>>>>>>>>>>>>> This semantic is same for both batch and stream
>>>>>>>>> processing.
>>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>>>>>> difference
>>>>>>>>>>>>>>>>>>>>>>> is that stream applications will only run once as
>> they
>>>>> are
>>>>>>>>>>>>>>>> long
>>>>>>>>>>>>>>>>>>>>> running.
>>>>>>>>>>>>>>>>>>>>>>> And the batch applications may be run multiple times,
>>>>>>>>> hence
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> cache
>>>>>>>>>>>>>>>>>>>>> may
>>>>>>>>>>>>>>>>>>>>>>> be created and dropped each time the application
>> runs.
>>>>>>>>>>>>>>>>>>>>>>> Admittedly, there will probably be some resource
>>>>>>>>> management
>>>>>>>>>>>>>>>>>>>>> requirements
>>>>>>>>>>>>>>>>>>>>>>> for the streaming cached table, such as time based /
>>>>> size
>>>>>>>>>>>>>>>> based
>>>>>>>>>>>>>>>>>>>>>> retention,
>>>>>>>>>>>>>>>>>>>>>>> to address the infinite data issue. But such
>>>>> requirement
>>>>>>>>>> does
>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>> change
>>>>>>>>>>>>>>>>>>>>>>> the semantic.
>>>>>>>>>>>>>>>>>>>>>>> You are right that interactive programming is just
>> one
>>>>> use
>>>>>>>>>>>>>>>> case
>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>> cache().
>>>>>>>>>>>>>>>>>>>>>>> It is not the only use case.
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> For me the more important issue is of not having the
>>>>> `void
>>>>>>>>>>>>>>>>>> cache()`
>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>> side effects.
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> This is indeed the key point. The argument around
>>>>> whether
>>>>>>>>>>>>>>>>> cache()
>>>>>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>>>>> return something already indicates that cache() and
>>>>>>>>>>>>>>>>> materialize()
>>>>>>>>>>>>>>>>>>>>> address
>>>>>>>>>>>>>>>>>>>>>>> different issues.
>>>>>>>>>>>>>>>>>>>>>>> Can you explain a bit more one what are the side
>>>>> effects?
>>>>>>>>> So
>>>>>>>>>>>>>>>>> far
>>>>>>>>>>>>>>>>>> my
>>>>>>>>>>>>>>>>>>>>>>> understanding is that such side effects only exist
>> if a
>>>>>>>>>> table
>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>> mutable.
>>>>>>>>>>>>>>>>>>>>>>> Is that the case?
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> I don’t know, probably initially we should make
>>>>>>>>> CachedTable
>>>>>>>>>>>>>>>>>>>> read-only.
>>>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>> don’t find it more confusing than the fact that user
>>>>> can
>>>>>>>>>> not
>>>>>>>>>>>>>>>>>> write
>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>> views
>>>>>>>>>>>>>>>>>>>>>>>> or materialised views in SQL or that user currently
>>>>> can
>>>>>>>>> not
>>>>>>>>>>>>>>>>>> write
>>>>>>>>>>>>>>>>>>>> to a
>>>>>>>>>>>>>>>>>>>>>>>> Table.
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> I don't think anyone should insert something to a
>>>>> cache.
>>>>>>>>> By
>>>>>>>>>>>>>>>>>>>> definition
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> cache should only be updated when the corresponding
>>>>>>>>> original
>>>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>> updated. What I am wondering is that given the
>>>>> following
>>>>>>>>> two
>>>>>>>>>>>>>>>>>> facts:
>>>>>>>>>>>>>>>>>>>>>>> 1. If and only if a table is mutable (with something
>>>>> like
>>>>>>>>>>>>>>>>>>> insert()),
>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>> CachedTable may have implicit behavior.
>>>>>>>>>>>>>>>>>>>>>>> 2. A CachedTable extends a Table.
>>>>>>>>>>>>>>>>>>>>>>> We can come to the conclusion that a CachedTable is
>>>>>>>>> mutable
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> users
>>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>> insert into the CachedTable directly. This is where I
>>>>>>>>>> thought
>>>>>>>>>>>>>>>>>>>>> confusing.
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> On Sat, Dec 1, 2018 at 2:45 AM Piotr Nowojski <
>>>>>>>>>>>>>>>>>>>> pi...@data-artisans.com
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Regarding naming `cache()` vs `materialize()`. One
>>>>> more
>>>>>>>>>>>>>>>>>>> explanation
>>>>>>>>>>>>>>>>>>>>> why
>>>>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>> think `materialize()` is more natural to me is that
>> I
>>>>>>>>> think
>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>>>> “Table”s
>>>>>>>>>>>>>>>>>>>>>>>> in Table-API as views. They behave the same way as
>> SQL
>>>>>>>>>>>>>>>> views,
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> only
>>>>>>>>>>>>>>>>>>>>>>>> difference for me is that their live scope is short
>> -
>>>>>>>>>>>>>>>> current
>>>>>>>>>>>>>>>>>>>> session
>>>>>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>>>>> is limited by different execution model. That’s why
>>>>>>>>>>>>>>>> “cashing”
>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>> view
>>>>>>>>>>>>>>>>>>>>>> for me
>>>>>>>>>>>>>>>>>>>>>>>> is just materialising it.
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> However I see and I understand your point of view.
>>>>> Coming
>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>>>> DataSet/DataStream and generally speaking non-SQL
>>>>> world,
>>>>>>>>>>>>>>>>>> `cache()`
>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>>>>>>> natural. But keep in mind that `.cache()` will/might
>>>>> not
>>>>>>>>>>>>>>>> only
>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>> used
>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>> interactive programming and not only in batching.
>> But
>>>>>>>>>> naming
>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>>>>>>> issue,
>>>>>>>>>>>>>>>>>>>>>>>> and not that critical to me. Especially that once we
>>>>>>>>>>>>>>>> implement
>>>>>>>>>>>>>>>>>>>> proper
>>>>>>>>>>>>>>>>>>>>>>>> materialised views, we can always deprecate/rename
>>>>>>>>>> `cache()`
>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>> deem
>>>>>>>>>>>>>>>>>>>>>> so.
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> For me the more important issue is of not having the
>>>>>>>>> `void
>>>>>>>>>>>>>>>>>>> cache()`
>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>> side effects. Exactly for the reasons that you have
>>>>>>>>>>>>>>>> mentioned.
>>>>>>>>>>>>>>>>>>> True:
>>>>>>>>>>>>>>>>>>>>>>>> results might be non deterministic if underlying
>>>>> source
>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>> changing.
>>>>>>>>>>>>>>>>>>>>>>>> Problem is that `void cache()` implicitly changes
>> the
>>>>>>>>>>>>>>>> semantic
>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>> subsequent uses of the cached/materialized Table. It
>>>>> can
>>>>>>>>>>>>>>>> cause
>>>>>>>>>>>>>>>>>>> “wtf”
>>>>>>>>>>>>>>>>>>>>>> moment
>>>>>>>>>>>>>>>>>>>>>>>> for a user if he inserts “b.cache()” call in some
>>>>> place
>>>>>>>>> in
>>>>>>>>>>>>>>>> his
>>>>>>>>>>>>>>>>>>> code
>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>> suddenly some other random places are behaving
>>>>>>>>> differently.
>>>>>>>>>>>>>>>> If
>>>>>>>>>>>>>>>>>>>>>>>> `materialize()` or `cache()` returns a Table handle,
>>>>> we
>>>>>>>>>>>>>>>> force
>>>>>>>>>>>>>>>>>> user
>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>> explicitly use the cache which removes the “random”
>>>>> part
>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> "suddenly
>>>>>>>>>>>>>>>>>>>>>>>> some other random places are behaving differently”.
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> This argument and others that I’ve raised (greater
>>>>>>>>>>>>>>>>>>>>> flexibility/allowing
>>>>>>>>>>>>>>>>>>>>>>>> user to explicitly bypass the cache) are independent
>>>>> of
>>>>>>>>>>>>>>>>>> `cache()`
>>>>>>>>>>>>>>>>>>> vs
>>>>>>>>>>>>>>>>>>>>>>>> `materialize()` discussion.
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Does that mean one can also insert into the
>>>>> CachedTable?
>>>>>>>>>>>>>>>> This
>>>>>>>>>>>>>>>>>>>> sounds
>>>>>>>>>>>>>>>>>>>>>>>> pretty confusing.
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> I don’t know, probably initially we should make
>>>>>>>>> CachedTable
>>>>>>>>>>>>>>>>>>>>> read-only. I
>>>>>>>>>>>>>>>>>>>>>>>> don’t find it more confusing than the fact that user
>>>>> can
>>>>>>>>>> not
>>>>>>>>>>>>>>>>>> write
>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>> views
>>>>>>>>>>>>>>>>>>>>>>>> or materialised views in SQL or that user currently
>>>>> can
>>>>>>>>> not
>>>>>>>>>>>>>>>>>> write
>>>>>>>>>>>>>>>>>>>> to a
>>>>>>>>>>>>>>>>>>>>>>>> Table.
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Piotrek
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> On 30 Nov 2018, at 17:38, Xingcan Cui <
>>>>>>>>> xingc...@gmail.com
>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> I agree with @Becket that `cache()` and
>>>>> `materialize()`
>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>> considered as two different methods where the later
>>>>> one
>>>>>>>>> is
>>>>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>>>>>>> sophisticated.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> According to my understanding, the initial idea is
>>>>> just
>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> introduce
>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>> simple cache or persist mechanism, but as the
>> TableAPI
>>>>>>>>> is a
>>>>>>>>>>>>>>>>>>>> high-level
>>>>>>>>>>>>>>>>>>>>>> API,
>>>>>>>>>>>>>>>>>>>>>>>> it’s naturally for as to think in a SQL way.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Maybe we can add the `cache()` method to the
>> DataSet
>>>>> API
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> force
>>>>>>>>>>>>>>>>>>>>>> users
>>>>>>>>>>>>>>>>>>>>>>>> to translate a Table to a Dataset before caching it.
>>>>> Then
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> users
>>>>>>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>>>>>> manually register the cached dataset to a table
>> again
>>>>> (we
>>>>>>>>>>>>>>>> may
>>>>>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>>>>> table replacement mechanisms for datasets with an
>>>>>>>>> identical
>>>>>>>>>>>>>>>>>> schema
>>>>>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>>>>>>>> different contents here). After all, it’s the
>> dataset
>>>>>>>>>> rather
>>>>>>>>>>>>>>>>>> than
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> dynamic table that need to be cached, right?
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>> Xingcan
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> On Nov 30, 2018, at 10:57 AM, Becket Qin <
>>>>>>>>>>>>>>>>>> becket....@gmail.com>
>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Piotrek and Jark,
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback and explanation. Those are
>>>>> good
>>>>>>>>>>>>>>>>>>> arguments.
>>>>>>>>>>>>>>>>>>>>>> But I
>>>>>>>>>>>>>>>>>>>>>>>>>> think those arguments are mostly about
>> materialized
>>>>>>>>> view.
>>>>>>>>>>>>>>>>> Let
>>>>>>>>>>>>>>>>>> me
>>>>>>>>>>>>>>>>>>>> try
>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>> explain the reason I believe cache() and
>>>>> materialize()
>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>> different.
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> I think cache() and materialize() have quite
>>>>> different
>>>>>>>>>>>>>>>>>>>> implications.
>>>>>>>>>>>>>>>>>>>>>> An
>>>>>>>>>>>>>>>>>>>>>>>>>> analogy I can think of is save()/publish(). When
>>>>> users
>>>>>>>>>>>>>>>> call
>>>>>>>>>>>>>>>>>>>> cache(),
>>>>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>> just like they are saving an intermediate result
>> as
>>>>> a
>>>>>>>>>>>>>>>> draft
>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>> their
>>>>>>>>>>>>>>>>>>>>>>>> work,
>>>>>>>>>>>>>>>>>>>>>>>>>> this intermediate result may not have any
>> realistic
>>>>>>>>>>>>>>>> meaning.
>>>>>>>>>>>>>>>>>>>> Calling
>>>>>>>>>>>>>>>>>>>>>>>>>> cache() does not mean users want to publish the
>>>>> cached
>>>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>>>>>>>>>> manner.
>>>>>>>>>>>>>>>>>>>>>>>>>> But when users call materialize(), that means "I
>>>>> have
>>>>>>>>>>>>>>>>>> something
>>>>>>>>>>>>>>>>>>>>>>>> meaningful
>>>>>>>>>>>>>>>>>>>>>>>>>> to be reused by others", now users need to think
>>>>> about
>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> validation,
>>>>>>>>>>>>>>>>>>>>>>>>>> update & versioning, lifecycle of the result, etc.
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Piotrek's suggestions on variations of the
>>>>>>>>> materialize()
>>>>>>>>>>>>>>>>>> methods
>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>> very
>>>>>>>>>>>>>>>>>>>>>>>>>> useful. It would be great if Flink have them. The
>>>>>>>>> concept
>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>> materialized
>>>>>>>>>>>>>>>>>>>>>>>>>> view is actually a pretty big feature, not to say
>>>>> the
>>>>>>>>>>>>>>>>> related
>>>>>>>>>>>>>>>>>>>> stuff
>>>>>>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>>>>>> triggers/hooks you mentioned earlier. I think the
>>>>>>>>>>>>>>>>> materialized
>>>>>>>>>>>>>>>>>>>> view
>>>>>>>>>>>>>>>>>>>>>>>> itself
>>>>>>>>>>>>>>>>>>>>>>>>>> should be discussed in a more thorough and
>>>>> systematic
>>>>>>>>>>>>>>>>> manner.
>>>>>>>>>>>>>>>>>>> And
>>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>>>> found
>>>>>>>>>>>>>>>>>>>>>>>>>> that discussion is kind of orthogonal and way
>> beyond
>>>>>>>>>>>>>>>>>> interactive
>>>>>>>>>>>>>>>>>>>>>>>>>> programming experience.
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> The example you gave was interesting. I still have
>>>>> some
>>>>>>>>>>>>>>>>>>> questions,
>>>>>>>>>>>>>>>>>>>>>>>> though.
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Table source = … // some source that scans files
>>>>> from a
>>>>>>>>>>>>>>>>>>> directory
>>>>>>>>>>>>>>>>>>>>>>>>>>> “/foo/bar/“
>>>>>>>>>>>>>>>>>>>>>>>>>>> Table t1 = source.groupBy(…).select(…).where(…)
>> ….;
>>>>>>>>>>>>>>>>>>>>>>>>>>> Table t2 = t1.materialize() // (or `cache()`)
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> t2.count() // initialise cache (if it’s lazily
>>>>>>>>>>>>>>>> initialised)
>>>>>>>>>>>>>>>>>>>>>>>>>>> int a1 = t1.count()
>>>>>>>>>>>>>>>>>>>>>>>>>>> int b1 = t2.count()
>>>>>>>>>>>>>>>>>>>>>>>>>>> // something in the background (or we trigger it)
>>>>>>>>> writes
>>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>> files
>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>> /foo/bar
>>>>>>>>>>>>>>>>>>>>>>>>>>> int a2 = t1.count()
>>>>>>>>>>>>>>>>>>>>>>>>>>> int b2 = t2.count()
>>>>>>>>>>>>>>>>>>>>>>>>>>> t2.refresh() // possible future extension, not to
>>>>> be
>>>>>>>>>>>>>>>>>>> implemented
>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>> initial version
>>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> what if someone else added some more files to
>>>>> /foo/bar
>>>>>>>>> at
>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>> point?
>>>>>>>>>>>>>>>>>>>>>> In
>>>>>>>>>>>>>>>>>>>>>>>>>> that case, a3 won't equals to b3, and the result
>>>>> become
>>>>>>>>>>>>>>>>>>>>>>>> non-deterministic,
>>>>>>>>>>>>>>>>>>>>>>>>>> right?
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> int a3 = t1.count()
>>>>>>>>>>>>>>>>>>>>>>>>>>> int b3 = t2.count()
>>>>>>>>>>>>>>>>>>>>>>>>>>> t2.drop() // another possible future extension,
>>>>> manual
>>>>>>>>>>>>>>>>>> “cache”
>>>>>>>>>>>>>>>>>>>>>> dropping
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> When we talk about interactive programming, in
>> most
>>>>>>>>>> cases,
>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>> talking
>>>>>>>>>>>>>>>>>>>>>>>>>> about batch applications. A fundamental assumption
>>>>> of
>>>>>>>>>> such
>>>>>>>>>>>>>>>>>> case
>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> source data is complete before the data processing
>>>>>>>>>> begins,
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>>>>>>>>>>>> will not change during the data processing. IMO,
>> if
>>>>>>>>>>>>>>>>> additional
>>>>>>>>>>>>>>>>>>>> rows
>>>>>>>>>>>>>>>>>>>>>>>> needs
>>>>>>>>>>>>>>>>>>>>>>>>>> to be added to some source during the processing,
>> it
>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>> done
>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>> ways
>>>>>>>>>>>>>>>>>>>>>>>>>> like union the source with another table
>> containing
>>>>> the
>>>>>>>>>>>>>>>> rows
>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>> added.
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> There are a few cases that computations are
>> executed
>>>>>>>>>>>>>>>>>> repeatedly
>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> changing data source.
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> For example, people may run a ML training job
>> every
>>>>>>>>> hour
>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> samples
>>>>>>>>>>>>>>>>>>>>>>>>>> newly added in the past hour. In that case, the
>>>>> source
>>>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>>>>>> between
>>>>>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>>>>>>> indeed change. But still, the data remain
>> unchanged
>>>>>>>>>> within
>>>>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>>>>> run.
>>>>>>>>>>>>>>>>>>>>>> And
>>>>>>>>>>>>>>>>>>>>>>>>>> usually in that case, the result will need
>>>>> versioning,
>>>>>>>>>>>>>>>> i.e.
>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>> given
>>>>>>>>>>>>>>>>>>>>>>>>>> result, it tells that the result is a result from
>>>>> the
>>>>>>>>>>>>>>>> source
>>>>>>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>>>>>>> by a
>>>>>>>>>>>>>>>>>>>>>>>>>> certain timestamp.
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>> Another example is something like data warehouse.
>> In
>>>>>>>>> this
>>>>>>>>>>>>>>>>>> case,
>>>>>>>>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>>>>>>>>>> are a
>>>>>>>>>>>>>>>>>>>>>>>>>> few source of original/raw data. On top of those
>>>>>>>>> sources,
>>>>>>>>>>>>>>>>> many
>>>>>>>>>>>>>>>>>>>>>>>> materialized
>>>>>>>>>>>>>>>>>>>>>>>>>> view / queries / reports / dashboards can be
>>>>> created to
>>>>>>>>>>>>>>>>>> generate
>>>>>>>>>>>>>>>>>>>>>> derived
>>>>>>>>>>>>>>>>>>>>>>>>>> data. Those derived data needs to be updated when
>>>>> the
>>>>>>>>>>>>>>>>>> underlying
>>>>>>>>>>>>>>>>>>>>>>>> original
>>>>>>>>>>>>>>>>>>>>>>>>>> data changes. In that case, the processing logic
>>>>> that
>>>>>>>>>>>>>>>>> derives
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> original
>>>>>>>>>>>>>>>>>>>>>>>>>> data needs to be executed repeatedly to update
>> those
>>>>>>>>>>>>>>>>>>>> reports/views.
>>>>>>>>>>>>>>>>>>>>>>>> Again,
>>>>>>>>>>>>>>>>>>>>>>>>>> all those derived data also need to ha
>>>>> 
>>>>> 
>> 
>> 
>> 

Reply via email to