[ 
https://issues.apache.org/jira/browse/FLINK-14807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17049884#comment-17049884
 ] 

Caizhi Weng commented on FLINK-14807:
-------------------------------------

Hi dear Flink community,

After some implementation and thinking I discover that the core of this problem 
is actually the communication and coordination between client, JM and 
operators. {{updateGlobalAggregate}} in JobMaster has provided us a way to 
coordinate operators, so if we can add a REST API so that the client can also 
call {{updateGlobalAggregate}} we can implement {{Table#collect}} without 
modifying JM. As these aggregate functions will run in JM, they're free to send 
and receive messages to and from both clients and operators.

We can use two aggregate functions to achieve this. One aggregate function is 
submitted by the sink to update the socket server address, the other aggregate 
function is submitted by the client to check if the socket server address has 
already been recorded and fetch results if so. The socket server address will 
be the value to be maintained by both client and operators.

For the exactly once / at least once semantics, I think [~becket_qin] raises a 
very good point. We can push the complexity to client (to the implementation of 
the iterator in client).

> Add Table#collect api for fetching data to client
> -------------------------------------------------
>
>                 Key: FLINK-14807
>                 URL: https://issues.apache.org/jira/browse/FLINK-14807
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / API
>    Affects Versions: 1.9.1
>            Reporter: Jeff Zhang
>            Priority: Major
>              Labels: usability
>             Fix For: 1.11.0
>
>         Attachments: table-collect.png
>
>
> Currently, it is very unconvinient for user to fetch data of flink job unless 
> specify sink expclitly and then fetch data from this sink via its api (e.g. 
> write to hdfs sink, then read data from hdfs). However, most of time user 
> just want to get the data and do whatever processing he want. So it is very 
> necessary for flink to provide api Table#collect for this purpose. 
>  
> Other apis such as Table#head, Table#print is also helpful.  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to