wangxianghu edited a comment on pull request #1827:
URL: https://github.com/apache/hudi/pull/1827#issuecomment-692604118
> That sounds promising. We cannot easily fix `T,R,O` though right at the
class level. it will be different in each setting, right?
>
> We can use generic methods though I think and what I had in mind was
something little different
> if the method signature can be the following
>
> ```
> public abstract <I,O> List<O> parallelDo(List<I> data, Function<I, O>
func, int parallelism);
> ```
>
> and for the Spark the implementation
>
> ```
> public <I,O> List<O> parallelDo(List<I> data, Function<I, O> func, int
parallelism) {
> return jsc.parallelize(data, parallelism).map(func).collectAsList();
> }
> ```
>
> and the invocation
>
> ```
> engineContext.parallelDo(Arrays.asList(1,2,3), (a) -> {
> return 0;
> }, 3);
> ```
>
> Can you a small Flink implementation as well and confirm this approach
will work for Flink as well. Thats another key thing to answer, before we
finalize the approach.
>
> Note that
>
> a) some of the code that uses `rdd.mapToPair()` etc before collecting,
have to be rewritten using Java Streams. i.e
>
> `engineContext.parallelDo().map()` instead of
`jsc.parallelize().map().mapToPair()` currently.
>
> b) Also we should limit the use of parallelDo() to only cases where there
are no grouping/aggregations involved. for e.g if we are doing
`jsc.parallelize(files, ..).map(record).reduceByKey(..)`, collecting without
reducing will lead to OOMs. We can preserve what you are doing currently, for
such cases.
>
> But the parallelDo should help up reduce the amount of code broken up (and
thus reduce the effort for the flink engine).
>
> @wangxianghu I am happy to shepherd this PR through from this point as
well. lmk
>
> cc @yanghua @leesf
Yes, I noticed that problem lately. we should use generic methods. I also
agree with the a) and b) you mentioned.
In the `parallelDo` method flink engine can operate the list directly using
Java Stream, I have verified that. but there is a problem:
The function used in spark map operator is
`org.apache.spark.api.java.function.Function` while what flink can use is
`java.util.function.Function` we should align it.
maybe we can use `java.util.function.Function` only, If this way, there is
no need to distinguish spark and flink, there both use java stream to implement
those kind operations. the generic method could be like this:
`public class HoodieEngineContext {
public <I, O> List<O> parallelDo(List<I> data, Function<I, O> func) {
return data.stream().map(func).collect(Collectors.toList());
}
}`
WDYT?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]