[
https://issues.apache.org/jira/browse/FLINK-14807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17064983#comment-17064983
]
Stephan Ewen commented on FLINK-14807:
--------------------------------------
Thanks a lot, that design document has made things clearer for me.
I think that a few design points a good, and we should go that way:
- Client initiated pulls
- Sink decides when to respond and with how much data.
- On exactly-once, sink pushes only after checkpoint is complete
- On at-least-once, sink can push early
- Tokens to deduplicate. Can the "token" actually be a sequence number that
is a composite of execution attempt and record number?
What I am not a big fan of is using the GlobalAggregator for this:
- The GlobalAggregator itself is already a hack. The fact that you transfer
the function with every RPC call, etc. is a not cleanly designed.
- You would open a socket connection in the aggregation method, which is
executed in the RPC system. That violates the basic principles of the RPC/actor
approach: no blocking operations (including I/O) in the RPC threads.
- Especially when you need to wait for the sink to actually produce
something, you would block on the Socket I/O operation, blocking the RPC
system, crashing the cluster.
I think we can go one of two ways:
(1) Use the OperatorCoordinator, like [~becket_qin] suggested. You can
backpressure there pretty easily when you assume that the coordinator always
polls the data from the sink (sends it an event to the sink to respond with the
next "batch"). We need to think a bit more about the REST/Coordinator
integration. Maybe an extended interface that listens to REST requests and
gives you a CompletableFuture<ByteBuffer> or so to set your respond to.
(2) A more proper "result proxy" component in the job manager. I know, I am
often asking to not put things into the core but rather build
"libraries/plugins", but this might be a case here that warrants a dedicated
component. Or, wild thought, does it actually make sense to use the normal
network stack for that? The result proxy on the JobManager would be the Netty
Client, the sink would have an IntermediateResult and output data in the normal
way.
==> I am personally leaning slightly towards re-using the OperatorCoordinators,
because if we ever need checkpointing integration (for tracking progress / JM
failover, etc.) we have the facilities there for that, as do we have for task
failover notifications, etc.
One last point I would like to bring up is how the client gets updates about
the job execution. While the client is fetching result data, in streaming, do
we still need to get notifications about changes in the job status? Or do we
assume that the client issues a separate request, once the fetch-data has
reached EOF?
> Add Table#collect api for fetching data to client
> -------------------------------------------------
>
> Key: FLINK-14807
> URL: https://issues.apache.org/jira/browse/FLINK-14807
> Project: Flink
> Issue Type: New Feature
> Components: Table SQL / API
> Affects Versions: 1.9.1
> Reporter: Jeff Zhang
> Assignee: Caizhi Weng
> Priority: Major
> Labels: usability
> Fix For: 1.11.0
>
> Attachments: table-collect-draft.patch, table-collect.png
>
>
> Currently, it is very unconvinient for user to fetch data of flink job unless
> specify sink expclitly and then fetch data from this sink via its api (e.g.
> write to hdfs sink, then read data from hdfs). However, most of time user
> just want to get the data and do whatever processing he want. So it is very
> necessary for flink to provide api Table#collect for this purpose.
>
> Other apis such as Table#head, Table#print is also helpful.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)