[ 
https://issues.apache.org/jira/browse/FLINK-14807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17033373#comment-17033373
 ] 

Caizhi Weng edited comment on FLINK-14807 at 2/26/20 3:55 AM:
--------------------------------------------------------------

Hi dear Flink community,

I'm digging this up because I have some thoughts on implementing this. I'll 
post my thoughts below and I'd love to hear your opinions.

>From my understanding, this problem has two key points. One is that where to 
>store the (possibly) never-ending results, and the other is that task managers 
>can not directly communicate with clients under some environments like k8s or 
>yarn. For the never-ending results, back pressuring will work to limit the 
>size of data in the whole cluster. For the communication between task managers 
>and clients, job manager must be the man in the middle as clients and task 
>managers are guaranteed to directly communicate with job manager.

So I come up with the following design. The new sink will only have 1 
parallelism. The class names and API names below are just placeholders.

!table-collect.png|width=600!
 # When the sink is initialized in task managers, it will create a socket 
server for providing the query results. The IP and port of the socket server 
will be given to the job master by the existing GlobalAggregateManager class.
 # When client want to fetch a portion of the query result, it will contact the 
JobManager.
 # The JobManager receives the RPC call and ask the socket server for results 
with a maximum size.
 # The socket server returns some results to the JobMaster.
 # JobMaster forwards the result to the client.

Some Q&As for the design above:
 * TM memories are protected by backpressuring. Do we have to introduce a new 
config option to set the maximum memory usage of the sink?
 >> Yes
 * What if the client disconnects / does not connect?
 >> The job will not finish as the sink isn't finished. The sink blocks on the 
 >> invoke method if its memory is full or blocks on the close method if not 
 >> all results have been read by the client. We can also add a timeout check 
 >> to the sink and let the sink fail if client goes away for a certain period 
 >> of time.
 * How to deal with retract / upsert streams?
 >> The return type will be Tuple2<Boolean, Row> where the first boolean value 
 >> indicates this row is an appending row or a retracting row.
 * Is the 1st step necessary?
 >> Yes, because the port of the socket server is unknown before created.

Some problems to discuss
 * Is the whole design necessary? Why don't we use accumulators to store and 
send results?
 >> Accumulators cannot deal with large results. But apart from this 
 >> accumulators seem to be OK. We can limit the maximum number of rows 
 >> provided by Table#collect and use accumulators to send results to the 
 >> JobMaster with each TM heartbeat. After collecting enough results the 
 >> client can cancel the job. The biggest problem is that for streaming jobs 
 >> we might have to wait until the next heartbeat (which is 10s by default) to 
 >> get the results and decide whether to cancel the job.
 * What if the job restarts?
 >> This is a problem about what kind of API we want to provide.
 ** If we can tolerate an at least once API this does not seem to be a problem. 
We can attach the index and the version (increased with each job restart) of 
each results when sending them back to the client and let the client deal with 
all these version things.
 ** If we want an exactly once API I'm afraid this is very difficult. For batch 
jobs we have to use external storage and for streaming jobs we might have to 
force users to use checkpoints. Backpressures from sink may also affect 
checkpoints.


was (Author: tsreaper):
Hi dear Flink community,

I'm digging this up because I have some thoughts on implementing this. I'll 
post my thoughts below and I'd love to hear your opinions.

>From my understanding, this problem has two key points. One is that where to 
>store the (possibly) never-ending results, and the other is that task managers 
>can not directly communicate with clients under some environments like k8s or 
>yarn. For the never-ending results, back pressuring will work to limit the 
>size of data in the whole cluster. For the communication between task managers 
>and clients, job manager must be the man in the middle as clients and task 
>managers are guaranteed to directly communicate with job manager.

So I come up with the following design. The new sink will only have 1 
parallelism. The class names and API names below are just placeholders.

!table-collect.png|width=600!
 # When the sink is initialized in task managers, it will create a socket 
server for providing the query results. The IP and port of the socket server 
will be given to the job master by the existing GlobalAggregateManager class.
 # When client want to fetch a portion of the query result, it will contact the 
JobManager.
 # The JobManager receives the RPC call and ask the socket server for results 
with a maximum size.
 # The socket server returns some results to the JobMaster.
 # JobMaster forwards the result to the client.

Some Q&As for the design above:
 * TM memories are protected by backpressuring. Do we have to introduce a new 
config option to set the maximum memory usage of the sink?
 >> Yes
 * What if the client disconnects / does not connect?
 >> The job will not finish as the sink isn't finished. The sink blocks on the 
 >> invoke method if its memory is full or blocks on the close method if not 
 >> all results have been read by the client.
 * How to deal with retract / upsert streams?
 >> The return type will be Tuple2<Boolean, Row> where the first boolean value 
 >> indicates this row is an appending row or a retracting row.
 * Is the 1st step necessary?
 >> Yes, because the port of the socket server is unknown before created.

Some problems to discuss
 * Is the whole design necessary? Why don't we use accumulators to store and 
send results?
 >> Accumulators cannot deal with large results. But apart from this 
 >> accumulators seem to be OK. We can limit the maximum number of rows 
 >> provided by Table#collect and use accumulators to send results to the 
 >> JobMaster with each TM heartbeat. After collecting enough results the 
 >> client can cancel the job. The biggest problem is that for streaming jobs 
 >> we might have to wait until the next heartbeat (which is 10s by default) to 
 >> get the results and decide whether to cancel the job.
 * What if the job restarts?
 >> This is a problem about what kind of API we want to provide.
 ** If we can tolerate an at least once API this does not seem to be a problem. 
We can attach the index and the version (increased with each job restart) of 
each results when sending them back to the client and let the client deal with 
all these version things.
 ** If we want an exactly once API I'm afraid this is very difficult. For batch 
jobs we have to use external storage and for streaming jobs we might have to 
force users to use checkpoints. Backpressures from sink may also affect 
checkpoints.

> Add Table#collect api for fetching data to client
> -------------------------------------------------
>
>                 Key: FLINK-14807
>                 URL: https://issues.apache.org/jira/browse/FLINK-14807
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / API
>    Affects Versions: 1.9.1
>            Reporter: Jeff Zhang
>            Priority: Major
>              Labels: usability
>             Fix For: 1.11.0
>
>         Attachments: table-collect.png
>
>
> Currently, it is very unconvinient for user to fetch data of flink job unless 
> specify sink expclitly and then fetch data from this sink via its api (e.g. 
> write to hdfs sink, then read data from hdfs). However, most of time user 
> just want to get the data and do whatever processing he want. So it is very 
> necessary for flink to provide api Table#collect for this purpose. 
>  
> Other apis such as Table#head, Table#print is also helpful.  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to