[
https://issues.apache.org/jira/browse/FLINK-37257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jinkun Liu updated FLINK-37257:
-------------------------------
Description:
When the number of keys is small, the existing hashing method can cause severe
state skew. For example, if the keys are only 0, 1, and 2, and the Flink job
has a parallelism of 3, the current Flink code might hash all keys to the
container with parallelism 2.
h3. Problem Background
In the data stream, a network I/O request is sent every five minutes.
Requirements
# {*}Reduce Request Volume{*}: Avoid sending a network I/O request for each
key, as the request volume would be too large.
# {*}Avoid Centralization{*}: Do not use WindowAll to concentrate data on a
single container due to the large data volume.
# {*}Group by Key{*}: Ensure that data with the same key is included in the
same I/O request.
I have tried the following code:
{code:java}
stream
.keyBy(value -> value.hashCode() % env.getParallelism())
.timeWindow()
.process(processFunction);{code}
I want each parallelism to process one key, but when the number of keys is
small, due to the presence of MurmurHash, some parallelisms may not be assigned
any keys, while others may be assigned multiple keys.
If possible please assign to me . I'd like to be a contributor.
was:
h1. Add an API on keyedStream to allow users to customize the hash method from
key to parallelism.
When the number of keys is small, the existing hashing method can cause severe
state skew. For example, if the keys are only 0, 1, and 2, and the Flink job
has a parallelism of 3, the current Flink code might hash all keys to the
container with parallelism 2.
h3. Problem Background
In the data stream, a network I/O request is sent every five minutes.
Requirements
# {*}Reduce Request Volume{*}: Avoid sending a network I/O request for each
key, as the request volume would be too large.
# {*}Avoid Centralization{*}: Do not use WindowAll to concentrate data on a
single container due to the large data volume.
# {*}Group by Key{*}: Ensure that data with the same key is included in the
same I/O request.
I have tried the following code:
{code:java}
stream
.keyBy(value -> value.hashCode() % env.getParallelism())
.timeWindow()
.process(processFunction);{code}
I want each parallelism to process one key, but when the number of keys is
small, due to the presence of MurmurHash, some parallelisms may not be assigned
any keys, while others may be assigned multiple keys.
If possible please assign to me . I'd like to be a contributor.
> Add an API on keyedStream to allow users to customize the hash method from
> key to parallelism.
> ----------------------------------------------------------------------------------------------
>
> Key: FLINK-37257
> URL: https://issues.apache.org/jira/browse/FLINK-37257
> Project: Flink
> Issue Type: New Feature
> Components: API / DataStream
> Affects Versions: 1.20.0, 2.0-preview
> Reporter: Jinkun Liu
> Priority: Minor
> Original Estimate: 504h
> Remaining Estimate: 504h
>
> When the number of keys is small, the existing hashing method can cause
> severe state skew. For example, if the keys are only 0, 1, and 2, and the
> Flink job has a parallelism of 3, the current Flink code might hash all keys
> to the container with parallelism 2.
> h3. Problem Background
> In the data stream, a network I/O request is sent every five minutes.
> Requirements
> # {*}Reduce Request Volume{*}: Avoid sending a network I/O request for each
> key, as the request volume would be too large.
> # {*}Avoid Centralization{*}: Do not use WindowAll to concentrate data on a
> single container due to the large data volume.
> # {*}Group by Key{*}: Ensure that data with the same key is included in the
> same I/O request.
> I have tried the following code:
> {code:java}
> stream
> .keyBy(value -> value.hashCode() % env.getParallelism())
> .timeWindow()
> .process(processFunction);{code}
>
> I want each parallelism to process one key, but when the number of keys is
> small, due to the presence of MurmurHash, some parallelisms may not be
> assigned any keys, while others may be assigned multiple keys.
>
> If possible please assign to me . I'd like to be a contributor.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)