[ https://issues.apache.org/jira/browse/HUDI-4338 ]


    chenfengLiu deleted comment on HUDI-4338:
    -----------------------------------

was (Author: liufangqi):
[~wangxianghu] Could you help to take a glance and review this PR ( [GitHub 
Pull Request #5997|https://github.com/apache/hudi/pull/5997]) when you are free?

> resolve the data skew when using flink datastream write hudi
> ------------------------------------------------------------
>
>                 Key: HUDI-4338
>                 URL: https://issues.apache.org/jira/browse/HUDI-4338
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: flink
>            Reporter: chenfengLiu
>            Assignee: chenfengLiu
>            Priority: Major
>              Labels: flink, pull-request-available
>         Attachments: image-2022-06-28-20-00-39-158.png, 
> image-2022-06-28-20-00-39-181.png
>
>
> Now when we create a flink DataStream to write a hudi table. We usually use 
> the org.apache.hudi.streamer.HoodieFlinkStreamer moudle class.
> The generated flink DAG contains BucketAssignFunction(required) and 
> StreamWriteFunction(required) and some other optioned Operator. 
> BucketAssignFunction will assgin bucket for incoming records, then 
> StreamWriteFunction will handle the stream witch keyed by bucket.
> {code:java}
> DataStream<Object> pipeline = hoodieDataStream
>     // Key-by record key, to avoid multiple subtasks write to a bucket at the 
> same time
>     .keyBy(HoodieRecord::getRecordKey)
>     .transform(
>         "bucket_assigner",
>         TypeInformation.of(HoodieRecord.class),
>         new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
>     .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
>     .uid("uid_bucket_assigner")
>     // shuffle by fileId(bucket id)
>     .keyBy(record -> record.getCurrentLocation().getFileId())
>     .transform("hoodie_stream_write", TypeInformation.of(Object.class), 
> operatorFactory)
>     .uid("uid_hoodie_stream_write")
>     .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); {code}
> For reducing the small file num, BucketAssignFunction will roll the bucket 
> every 50w records by default. So at most time BucketAssignFunction will hold 
> the bucket num which equals to its parallelism. And usually 
> BucketAssignFunction has the same parallelism as StreamWriteFunction, we 
> can't promise that every single bucket will be send to only one 
> StreamWriteFunction task.
> And finally we will get the data skew case like this:
> !image-2022-06-28-20-00-39-158.png!!image-2022-06-28-20-00-39-181.png!
> The data skew may cause the backpresure which make ck timeout. And the flink 
> hudi write pipeline strongly depend on the ck completed to commit the instant.
> I think should we chain the operator when BucketAssignFunction's parallelism 
> equals to StreamWriteFunction's parallelism. 
> It will improve the huge performance and Stability of the write job after 
> testing. It reslove the data skew and reduce the network overhead. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to