[ https://issues.apache.org/jira/browse/HUDI-4338 ]
chenfengLiu deleted comment on HUDI-4338:
-----------------------------------
was (Author: liufangqi):
[~wangxianghu] Could you help to take a glance and review this PR ( [GitHub
Pull Request #5997|https://github.com/apache/hudi/pull/5997]) when you are free?
> resolve the data skew when using flink datastream write hudi
> ------------------------------------------------------------
>
> Key: HUDI-4338
> URL: https://issues.apache.org/jira/browse/HUDI-4338
> Project: Apache Hudi
> Issue Type: Improvement
> Components: flink
> Reporter: chenfengLiu
> Assignee: chenfengLiu
> Priority: Major
> Labels: flink, pull-request-available
> Attachments: image-2022-06-28-20-00-39-158.png,
> image-2022-06-28-20-00-39-181.png
>
>
> Now when we create a flink DataStream to write a hudi table. We usually use
> the org.apache.hudi.streamer.HoodieFlinkStreamer moudle class.
> The generated flink DAG contains BucketAssignFunction(required) and
> StreamWriteFunction(required) and some other optioned Operator.
> BucketAssignFunction will assgin bucket for incoming records, then
> StreamWriteFunction will handle the stream witch keyed by bucket.
> {code:java}
> DataStream<Object> pipeline = hoodieDataStream
> // Key-by record key, to avoid multiple subtasks write to a bucket at the
> same time
> .keyBy(HoodieRecord::getRecordKey)
> .transform(
> "bucket_assigner",
> TypeInformation.of(HoodieRecord.class),
> new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
> .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
> .uid("uid_bucket_assigner")
> // shuffle by fileId(bucket id)
> .keyBy(record -> record.getCurrentLocation().getFileId())
> .transform("hoodie_stream_write", TypeInformation.of(Object.class),
> operatorFactory)
> .uid("uid_hoodie_stream_write")
> .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); {code}
> For reducing the small file num, BucketAssignFunction will roll the bucket
> every 50w records by default. So at most time BucketAssignFunction will hold
> the bucket num which equals to its parallelism. And usually
> BucketAssignFunction has the same parallelism as StreamWriteFunction, we
> can't promise that every single bucket will be send to only one
> StreamWriteFunction task.
> And finally we will get the data skew case like this:
> !image-2022-06-28-20-00-39-158.png!!image-2022-06-28-20-00-39-181.png!
> The data skew may cause the backpresure which make ck timeout. And the flink
> hudi write pipeline strongly depend on the ck completed to commit the instant.
> I think should we chain the operator when BucketAssignFunction's parallelism
> equals to StreamWriteFunction's parallelism.
> It will improve the huge performance and Stability of the write job after
> testing. It reslove the data skew and reduce the network overhead.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)