[
https://issues.apache.org/jira/browse/FLINK-14807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17033373#comment-17033373
]
Caizhi Weng edited comment on FLINK-14807 at 2/26/20 3:51 AM:
--------------------------------------------------------------
Hi dear Flink community,
I'm digging this up because I have some thoughts on implementing this. I'll
post my thoughts below and I'd love to hear your opinions.
>From my understanding, this problem has two key points. One is that where to
>store the (possibly) never-ending results, and the other is that task managers
>can not directly communicate with clients under some environments like k8s or
>yarn. For the never-ending results, back pressuring will work to limit the
>size of data in the whole cluster. For the communication between task managers
>and clients, job manager must be the man in the middle as clients and task
>managers are guaranteed to directly communicate with job manager.
So I come up with the following design. The new sink will only have 1
parallelism. The class names and API names below are just placeholders.
!table-collect.png|width=600!
# When the sink is initialized in task managers, it will create a socket
server for providing the query results. The IP and port of the socket server
will be given to the job master by the existing GlobalAggregateManager class.
# When client want to fetch a portion of the query result, it will contact the
JobManager.
# The JobManager receives the RPC call and ask the socket server for results
with a maximum size.
# The socket server returns some results to the JobMaster.
# JobMaster forwards the result to the client.
Some Q&As for the design above:
* TM memories are protected by backpressuring. Do we have to introduce a new
config option to set the maximum memory usage of the sink?
>> Yes
* What if the client disconnects / does not connect?
>> The job will not finish as the sink isn't finished. The sink blocks on the
>> invoke method if its memory is full or blocks on the close method if not
>> all results have been read by the client.
* How to deal with retract / upsert streams?
>> The return type will be Tuple2<Boolean, Row> where the first boolean value
>> indicates this row is an appending row or a retracting row.
* Is the 1st step necessary?
>> Yes, because the port of the socket server is unknown before created.
Some problems to discuss
* Is the whole design necessary? Why don't we use accumulators to store and
send results?
>> Accumulators cannot deal with large results. But apart from this
>> accumulators seem to be OK. We can limit the maximum number of rows
>> provided by Table#collect and use accumulators to send results to the
>> JobMaster with each TM heartbeat. After collecting enough results the
>> client can cancel the job. The biggest problem is that for streaming jobs
>> we might have to wait until the next heartbeat (which is 10s by default) to
>> get the results and decide whether to cancel the job.
* What if the job restarts?
>> This is a problem about what kind of API we want to provide.
** If we can tolerate an at least once API this does not seem to be a problem.
We can attach the index and the version (increased with each job restart) of
each results when sending them back to the client and let the client deal with
all these version things.
** If we want an exactly once API I'm afraid this is very difficult. For batch
jobs we have to use external storage and for streaming jobs we might have to
force users to use checkpoints. Backpressures from sink may also affect
checkpoints.
was (Author: tsreaper):
Hi dear Flink community,
I'm digging this up because I have some thoughts on implementing this. I'll
post my thoughts below and I'd love to hear your opinions.
>From my understanding, this problem has two key points. One is that where to
>store the (possibly) never-ending results, and the other is that task managers
>can directly communicate with clients under some environments like k8s or
>yarn. For the never-ending results, back pressuring will work to limit the
>size of data in the whole cluster. For the communication between task managers
>and clients, job manager must be the man in the middle as clients and task
>managers are guaranteed to directly communicate with job manager.
As REST API is the main communication method between clients and job managers,
I'm going to introduce two new internal REST API and a new sink to do the job.
The new sink will only have 1 parallelism. The class names and API names below
are just placeholders.
!table-collect.png|width=600!
# When the sink is initialized in task managers, it will create a socket
server for providing the query results. The IP and port of the socket server
will be given to the REST server by a REST API call.
# When client want to fetch a portion of the query result, it will provide the
job id and a token to the REST server. Here token is an integer indicating the
index of the first returning result. Note that token must be non-decreasing
across several API calls for the same job.
# REST server receives the API call and ask the socket server for results with
a maximum size.
# The socket server returns some results to the REST server.
# REST server forwards the result to the client.
Some Q&As for the design above:
* TM memories are protected by backpressuring. Do we have to introduce a new
config option to set the maximum memory usage of the sink?
>> Yes
* What if the client disconnects?
>> REST API is a non-stateful API so it will not suffer from disconnecting.
>> With each API call the client must provide a token to the server indicating
>> the index of the first result it would like to read, so if something goes
>> wrong when the client is fetching a batch of results, we just need to
>> provide the same token to the REST server and the same batch of results
>> will come again.
* What if the client does not connect?
>> The job will not finish as the sink isn't finished.
* What if the job restarts?
>> For streaming jobs we have states to recover from so it's not a big
>> problem. For batch jobs, as clients will provide tokens to the REST server,
>> the REST server can just skip the results before the token and continue to
>> work normally.
* How to deal with retract / upsert streams?
>> The return type will be Tuple2<Boolean, Row> where the first boolean value
>> indicates this row is an appending row or a retracting row.
* Is the 1st step necessary?
>> Yes, because the port of the socket server is unknown before created.
> Add Table#collect api for fetching data to client
> -------------------------------------------------
>
> Key: FLINK-14807
> URL: https://issues.apache.org/jira/browse/FLINK-14807
> Project: Flink
> Issue Type: New Feature
> Components: Table SQL / API
> Affects Versions: 1.9.1
> Reporter: Jeff Zhang
> Priority: Major
> Labels: usability
> Fix For: 1.11.0
>
> Attachments: table-collect.png
>
>
> Currently, it is very unconvinient for user to fetch data of flink job unless
> specify sink expclitly and then fetch data from this sink via its api (e.g.
> write to hdfs sink, then read data from hdfs). However, most of time user
> just want to get the data and do whatever processing he want. So it is very
> necessary for flink to provide api Table#collect for this purpose.
>
> Other apis such as Table#head, Table#print is also helpful.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)