[
https://issues.apache.org/jira/browse/FLINK-14807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17044034#comment-17044034
]
Caizhi Weng edited comment on FLINK-14807 at 2/25/20 10:24 AM:
---------------------------------------------------------------
Thanks for the comment [~godfreyhe]. I'll list more details about my design
below.
* How can sink tell the REST server its address and port?
>> This is the hardest part of the design. I actually haven't come up with a
>> very good solution. I now have three options listed below:
** Option A.
[FLIP-27|https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface]
will introduce Operator Coordinator which can perfectly solve this issue. But
according to [~jqin] we won't have this feature until 1.11.
** Option B. We can use accumulators to tell the job manager the address and
port of the socket server. But as accumulators are sent with heartbeats and the
default interval between heartbeats are 10s, this will greatly impact small
jobs.
** Option C. Extract TaskConfig from JobGraph in job manager and insert server
information into it. This requires the socket server to start in the job
manager instead of sink, and it seems to be quite hacky...
* What if a job without ordering restarts?
>> As streaming jobs are backed by checkpoints this is not a problem. For
>> batch jobs I'm afraid we'll have to introduce a special element in the
>> resulting iterator indicating that the previous results provided are now
>> invalid.
was (Author: tsreaper):
Thanks for the comment [~godfreyhe]. I'll list more details about my design
below.
* How can sink tell the REST server its address and port?
>> Sink indeed does not know the address and port of the REST server. I'm
going to use accumulators to achieve this as accumulator values are updated to
job managers in each heartbeat message.
* What if a job without ordering restarts?
>> As streaming jobs are backed by checkpoints this is not a problem. For
batch jobs I'm afraid we'll have to introduce a special element in the
resulting iterator indicating that the previous results provided are now
invalid.
* What if the job manager fails and is restarted by an HA service?
>> Sinks and job managers communicate with accumulators, while clients and
job managers communicate with RestClusterClient. These existing functionalities
have already considered this issue.
> Add Table#collect api for fetching data to client
> -------------------------------------------------
>
> Key: FLINK-14807
> URL: https://issues.apache.org/jira/browse/FLINK-14807
> Project: Flink
> Issue Type: New Feature
> Components: Table SQL / API
> Affects Versions: 1.9.1
> Reporter: Jeff Zhang
> Priority: Major
> Labels: usability
> Fix For: 1.11.0
>
> Attachments: table-collect.png
>
>
> Currently, it is very unconvinient for user to fetch data of flink job unless
> specify sink expclitly and then fetch data from this sink via its api (e.g.
> write to hdfs sink, then read data from hdfs). However, most of time user
> just want to get the data and do whatever processing he want. So it is very
> necessary for flink to provide api Table#collect for this purpose.
>
> Other apis such as Table#head, Table#print is also helpful.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)