[ 
https://issues.apache.org/jira/browse/FLINK-14807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17052101#comment-17052101
 ] 

Stephan Ewen commented on FLINK-14807:
--------------------------------------

Nice to see this lively discussion. I agree that we should come up with a 
proper solution and works backwards from there.
It would also be great to find a way that works across batch and streaming. Not 
sure how easy that is possible, but maybe we can think along those lines.

About the ideas proposed so far:

* Concerning exactly-once semantics: That would be nice, but it would 
inherently introduce a "checkpoint interval delay", because we can only feed 
back completed results. Maybe that is okay in the streaming world, for batch 
there is anyways only the "end of job" state.

* Concerning master-failover: I think we can get away with not supporting that 
in the first version, as long as we point out that limitation.

* Concerning task-failover: That sounds like an expected functionality to me

* Server sockets as the basic data exchange plane are tricky. Lost connections, 
retries, etc. There needs to be a more sophisticated protocol on top of that in 
any case.
* The GlobalAggregateManager is itself actually a pretty bad hack at the 
moment, which I would hope to replace by the {{OperatorCoordinators}} in the 
future.

One thing we could try and do is enhance the accumulators, with blob-server 
offload.
* This would be similar as the RPC service works. Small accumulator results are 
directly in an RPC message, larger (aggregated results) may be offloaded to the 
blob storage.
* We would send new accumulators with the checkpoint (or checkpoint commit), 
giving exactly once semantics.
* Clients would pull the accumulators, the rest server would need to pull them 
from blob storage if necessary.
* That might work, but it is a lot of indirection / out of band transfer as 
soon as results get larger. Which might be expected to some extend, to keep the 
memory footprint of the processes small.

> Add Table#collect api for fetching data to client
> -------------------------------------------------
>
>                 Key: FLINK-14807
>                 URL: https://issues.apache.org/jira/browse/FLINK-14807
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / API
>    Affects Versions: 1.9.1
>            Reporter: Jeff Zhang
>            Priority: Major
>              Labels: usability
>             Fix For: 1.11.0
>
>         Attachments: table-collect-draft.patch, table-collect.png
>
>
> Currently, it is very unconvinient for user to fetch data of flink job unless 
> specify sink expclitly and then fetch data from this sink via its api (e.g. 
> write to hdfs sink, then read data from hdfs). However, most of time user 
> just want to get the data and do whatever processing he want. So it is very 
> necessary for flink to provide api Table#collect for this purpose. 
>  
> Other apis such as Table#head, Table#print is also helpful.  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to