[
https://issues.apache.org/jira/browse/FLINK-14807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17053298#comment-17053298
]
Caizhi Weng commented on FLINK-14807:
-------------------------------------
Hi [~sewen],
Thanks for the reply. I think we might need to sync on the use cases and the
implementation ideas of {{Table#collect}}.
*Use cases we expected*
We expect {{Table#collect}} to be used for both batch and streaming jobs, and
that it supports iterating through large results.
*Why do we come up with the implementation idea based on back-pressuring*
# As Flink does not have (or rely on) a dedicated storage service (like Hive
which relies on HDFS, or like Kafka which is itself a storage service), we
cannot store the (possibly infinite) results into somewhere, even into the blob
storage (it may finally be filled up by the results of a never-ending streaming
job). So we have to rely on back-pressuring to limit the size of results
generated but not consumed by the clients.
# As the sink will back-pressure the whole job, it needs to be informed when
to send new results to the client and to loose the pressure. That is to say,
there must be a way for the clients to communicate with task managers. But
under some network configurations (for example a Yarn or k8s cluster) clients
can't directly communicate with TMs, so job managers must forward the message
from the clients to the sinks.
# Currently there is no way for JMs to initiatively talk to sinks.
{{OperatorCoordinators}} is indeed the feature we want most but it hasn't been
implemented, so we create a socket server in the sink function to do the
communication work (we cannot introduce a new RPC interface to achieve this as
its functionality will overlap with {{OperatorCoordinators}}). In this way we
do not need to modify the JMs so we can conveniently refactor this when
{{OperatorCoordinators}} is implemented.
# Currently there is no way for clients / sinks to initiatively talk to JMs
either. We don't want to introduce a new feature and currently
{{GlobalAggregateManager}} is the only RPC call to achieve this functionality.
We of course would like to use {{OperatorCoordinators}} but it is not
implemented yet.
*What if the connection to the server socket is lost / restarts*
To deal with this problem we've introduced a token in the communication. See
the patch for a detailed implementation.
When the client needs some new results from the sink, it must provide the sink
with this token. This token can be considered as "The client has successfully
received the results before the {{token}}-th one, and wants some results
starting from the {{token}}-th result of the job". If the connection is lost /
data is corrupted or any bad things happen, the client can provide the sink
with the same token and it can get the same batch of results again.
*What if the sink / server socket restarts*
To deal with this problem we've introduced a version ID in the communication.
See the patch for a detailed implementation.
This version ID will be set as a random UUID when the sink opens. So if the
client discover that the version of the results received is different from what
it is expecting, it knows that the sink has restarted and the client can throw
away all the data it receives after the latest checkpoint.
Here "client" means the result iterator which is also implemented by us. Users
using this iterator will see the results under the exactly-once semantics.
*Users may experience checkpoint interval delay*
We decide to provide the users with two kinds of iterators. One iterator will
directly forward the results to the users, but will throw a
{{RuntimeException}} when the sink restarts. The other will buffer the results
(for large results it will spill them to disks) and show the results to the
users only after checkpointing. Users are free to choose according to their
needs.
*Why not use accumulators / blob servers*
We've considered the implementation with accumulators before. There are a few
problems:
# By using accumulators and blob servers, sinks are initiatively (instead of
passively asked by the client) providing the results. That is to say, we cannot
control the speed of the sink for producing the results and we have to store
these results in somewhere.
# Accumulators are currently updated with TM heartbeats. The default heartbeat
interval is 10s which is too slow for a small job. Also, when the results
become large it will put a heavy burden on the network and JM memory.
# Like we mentioned above, we can't store the results in some component as they
will finally be filled up by a never-ending streaming job. So we can't store
the results in blob servers unless blob servers are backed by a dedicated
storage service.
> Add Table#collect api for fetching data to client
> -------------------------------------------------
>
> Key: FLINK-14807
> URL: https://issues.apache.org/jira/browse/FLINK-14807
> Project: Flink
> Issue Type: New Feature
> Components: Table SQL / API
> Affects Versions: 1.9.1
> Reporter: Jeff Zhang
> Priority: Major
> Labels: usability
> Fix For: 1.11.0
>
> Attachments: table-collect-draft.patch, table-collect.png
>
>
> Currently, it is very unconvinient for user to fetch data of flink job unless
> specify sink expclitly and then fetch data from this sink via its api (e.g.
> write to hdfs sink, then read data from hdfs). However, most of time user
> just want to get the data and do whatever processing he want. So it is very
> necessary for flink to provide api Table#collect for this purpose.
>
> Other apis such as Table#head, Table#print is also helpful.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)