[ 
https://issues.apache.org/jira/browse/FLINK-14807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17044034#comment-17044034
 ] 

Caizhi Weng edited comment on FLINK-14807 at 2/25/20 10:24 AM:
---------------------------------------------------------------

Thanks for the comment [~godfreyhe]. I'll list more details about my design 
below.
 * How can sink tell the REST server its address and port?
 >> This is the hardest part of the design. I actually haven't come up with a 
 >> very good solution. I now have three options listed below:
 ** Option A. 
[FLIP-27|https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface]
 will introduce Operator Coordinator which can perfectly solve this issue. But 
according to [~jqin] we won't have this feature until 1.11.
 ** Option B. We can use accumulators to tell the job manager the address and 
port of the socket server. But as accumulators are sent with heartbeats and the 
default interval between heartbeats are 10s, this will greatly impact small 
jobs.
 ** Option C. Extract TaskConfig from JobGraph in job manager and insert server 
information into it. This requires the socket server to start in the job 
manager instead of sink, and it seems to be quite hacky...
 * What if a job without ordering restarts?
 >> As streaming jobs are backed by checkpoints this is not a problem. For 
 >> batch jobs I'm afraid we'll have to introduce a special element in the 
 >> resulting iterator indicating that the previous results provided are now 
 >> invalid.


was (Author: tsreaper):
Thanks for the comment [~godfreyhe]. I'll list more details about my design 
below.
 * How can sink tell the REST server its address and port?
    >> Sink indeed does not know the address and port of the REST server. I'm 
going to use accumulators to achieve this as accumulator values are updated to 
job managers in each heartbeat message.
 * What if a job without ordering restarts?
    >> As streaming jobs are backed by checkpoints this is not a problem. For 
batch jobs I'm afraid we'll have to introduce a special element in the 
resulting iterator indicating that the previous results provided are now 
invalid.
 * What if the job manager fails and is restarted by an HA service?
   >> Sinks and job managers communicate with accumulators, while clients and 
job managers communicate with RestClusterClient. These existing functionalities 
have already considered this issue.

> Add Table#collect api for fetching data to client
> -------------------------------------------------
>
>                 Key: FLINK-14807
>                 URL: https://issues.apache.org/jira/browse/FLINK-14807
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / API
>    Affects Versions: 1.9.1
>            Reporter: Jeff Zhang
>            Priority: Major
>              Labels: usability
>             Fix For: 1.11.0
>
>         Attachments: table-collect.png
>
>
> Currently, it is very unconvinient for user to fetch data of flink job unless 
> specify sink expclitly and then fetch data from this sink via its api (e.g. 
> write to hdfs sink, then read data from hdfs). However, most of time user 
> just want to get the data and do whatever processing he want. So it is very 
> necessary for flink to provide api Table#collect for this purpose. 
>  
> Other apis such as Table#head, Table#print is also helpful.  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to