[
https://issues.apache.org/jira/browse/FLINK-13943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16976354#comment-16976354
]
Caizhi Weng edited comment on FLINK-13943 at 11/18/19 8:33 AM:
---------------------------------------------------------------
Hi dear Flink community. My ideas for this improvement are as follows:
h3. Solution 1: Using {{CollectTableSink}} and {{CollectOutputFormat}}
Blink planner already contains {{CollectTableSink}} and {{CollectOutputFormat}}
classes which can serialize data streams into a list using
{{SerializedListAccumulator}}. These classes greatly simplifies this
improvement.
As a starting point, we can add a method which is very similar to
{{DataSet#collect}}: we execute the current table, and fetch the results
collected by the accumulators, then deserialize it into our desired list.
The problem is that: this solution can only be applied to batch jobs whose
results are of moderate size, for batch jobs having huge results size or for
never-ending streaming jobs, as we cannot store the results in memory, this
solution is not applicable.
h3. Solution 2: Using {{DataStreamUtils::collect}}
If we do not need the result to be a set, we can also fetch the result in an
iterative style. flink-streaming-java model already contains the
{{DataStreamUtils::collect}} method which returns an iterator to read results
from the local or remote execution environment.
The good thing is that this solution is applicable for both batch and
streaming. Although I haven't tested it, from my understanding about the code,
the problem of this solution is that we'll have to send the results buffer by
buffer instead of record by record (otherwise it might be too slow), and that
if the client consumes the results too slowly, it might block the whole task.
To solve this, we may introduce two config options, one is to control the heap
memory usage of {{CollectSink}} so that it can temporarily buffer unconsumed
results, the other is to set the consuming strategy (BLOCKING or NON_BLOCKING).
For blocking strategy, {{CollectSink}} will block the task if its memory buffer
is full, otherwise it will throw away the oldest unconsumed result to make
space for the new ones.
was (Author: tsreaper):
Hi dear Flink community. My ideas for this improvement are as follows:
Blink planner already contains {{CollectTableSink}} and {{CollectOutputFormat}}
classes which can serialize data streams into a list using
{{SerializedListAccumulator}}. These classes greatly simplifies this
improvement.
As a starting point, we can add a method which is very similar to
{{DataSet#collect}}: we execute the current table, and fetch the results
collected by the accumulators, then deserialize it into our desired list.
The problem is that: this solution can only be applied to batch jobs whose
results are of moderate size, for batch jobs having huge results size or for
never-ending streaming jobs, as we cannot store the results in memory, this
solution is not applicable.
> Provide api to convert flink table to java List (e.g. Table#collect)
> --------------------------------------------------------------------
>
> Key: FLINK-13943
> URL: https://issues.apache.org/jira/browse/FLINK-13943
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / API
> Reporter: Jeff Zhang
> Assignee: Caizhi Weng
> Priority: Major
>
> It would be nice to convert flink table to java List so that I can do other
> data manipulation in client side after execution flink job. For flink
> planner, I can convert flink table to DataSet and use DataSet#collect, but
> for blink planner, there's no such api.
> EDIT from FLINK-14807:
> Currently, it is very unconvinient for user to fetch data of flink job unless
> specify sink expclitly and then fetch data from this sink via its api (e.g.
> write to hdfs sink, then read data from hdfs). However, most of time user
> just want to get the data and do whatever processing he want. So it is very
> necessary for flink to provide api Table#collect for this purpose.
> Other apis such as Table#head, Table#print is also helpful.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)