wangxianghu edited a comment on pull request #1827:
URL: https://github.com/apache/hudi/pull/1827#issuecomment-692604118


   > That sounds promising. We cannot easily fix `T,R,O` though right at the 
class level. it will be different in each setting, right?
   > 
   > We can use generic methods though I think and what I had in mind was 
something little different
   > if the method signature can be the following
   > 
   > ```
   > public abstract <I,O> List<O> parallelDo(List<I> data, Function<I, O> 
func, int parallelism); 
   > ```
   > 
   > and for the Spark the implementation
   > 
   > ```
   > public <I,O> List<O> parallelDo(List<I> data, Function<I, O> func, int 
parallelism) {
   >    return jsc.parallelize(data, parallelism).map(func).collectAsList();
   > }
   > ```
   > 
   > and the invocation
   > 
   > ```
   > engineContext.parallelDo(Arrays.asList(1,2,3), (a) -> {
   >       return 0;
   >     }, 3);
   > ```
   > 
   > Can you a small Flink implementation as well and confirm this approach 
will work for Flink as well. Thats another key thing to answer, before we 
finalize the approach.
   > 
   > Note that
   > 
   > a) some of the code that uses `rdd.mapToPair()` etc before collecting, 
have to be rewritten using Java Streams. i.e
   > 
   > `engineContext.parallelDo().map()` instead of 
`jsc.parallelize().map().mapToPair()` currently.
   > 
   > b) Also we should limit the use of parallelDo() to only cases where there 
are no grouping/aggregations involved. for e.g if we are doing 
`jsc.parallelize(files, ..).map(record).reduceByKey(..)`, collecting without 
reducing will lead to OOMs. We can preserve what you are doing currently, for 
such cases.
   > 
   > But the parallelDo should help up reduce the amount of code broken up (and 
thus reduce the effort for the flink engine).
   > 
   > @wangxianghu I am happy to shepherd this PR through from this point as 
well. lmk
   > 
   > cc @yanghua @leesf
   
   Yes, I noticed that problem lately. we should use generic methods. I also 
agree with the a) and b) you mentioned.
   In the `parallelDo` method flink engine can operate the list directly using 
Java Stream, I have verified that. but there is a problem:
   The function used in spark map operator is 
`org.apache.spark.api.java.function.Function` while what flink can use is 
`java.util.function.Function` we should align it. 
   
   maybe we can use `java.util.function.Function` only,  If this way, there is 
no need to distinguish spark and flink, there both use java stream to implement 
those kind operations. the generic method could be like this:
   ```
   public class HoodieEngineContext {
     public <I, O> List<O> parallelDo(List<I> data, Function<I, O> func) {
       return data.stream().map(func).collect(Collectors.toList());
     }
   }
   ```
   or implement the logic using java stream directly and don't introduce the 
`parallelDo` method.
   WDYT?
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to