[
https://issues.apache.org/jira/browse/BEAM-5865?focusedWorklogId=238482&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-238482
]
ASF GitHub Bot logged work on BEAM-5865:
----------------------------------------
Author: ASF GitHub Bot
Created on: 07/May/19 13:50
Start Date: 07/May/19 13:50
Worklog Time Spent: 10m
Work Description: mxm commented on pull request #8499: [BEAM-5865] Create
optional auto-balancing sharding function for Flink
URL: https://github.com/apache/beam/pull/8499#discussion_r281631647
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
##########
@@ -222,7 +282,117 @@ boolean canTranslate(T transform,
FlinkStreamingTranslationContext context) {
@Override
public Map<PValue, ReplacementOutput> mapOutputs(
Map<TupleTag<?>, PValue> outputs, WriteFilesResult<DestinationT>
newOutput) {
- return Collections.emptyMap();
+ return ReplacementOutputs.tagged(outputs, newOutput);
+ }
+ }
+
+ /**
+ * Flink has a known problem of unevenly assigning keys to key groups (and
then workers) for cases
+ * that number of keys is not >> key groups. This is typical scenario when
writing files, where
+ * one do not want to end up with too many small files. This {@link
ShardingFunction} implements
+ * what was suggested on Flink's <a
+ *
href="http://mail-archives.apache.org/mod_mbox/flink-dev/201901.mbox/%3CCAOUjMkygmFMDbOJNCmndrZ0bYug=iQJmVz6QvMW7C87n=pw...@mail.gmail.com%3E">mailing
+ * list</a> and properly chooses shard keys in a way that they are
distributed evenly among
+ * workers by Flink.
+ */
+ private static class FlinkAutoBalancedShardKeyShardingFunction<UserT,
DestinationT>
+ implements ShardingFunction<UserT, DestinationT> {
+
+ private final int parallelism;
+ private final int maxParallelism;
+ private final Coder<DestinationT> destinationCoder;
+ private final Map<CacheKey, ShardedKey<Integer>> cache = new HashMap<>();
+ private final Set<Integer> usedSalts = new HashSet<>();
+
+ private int shardNumber = -1;
+
+ private FlinkAutoBalancedShardKeyShardingFunction(
+ int parallelism, int maxParallelism, Coder<DestinationT>
destinationCoder) {
+ this.parallelism = parallelism;
+ this.maxParallelism = maxParallelism;
+ this.destinationCoder = destinationCoder;
+ }
+
+ @Override
+ public ShardedKey<Integer> assignShardKey(
+ DestinationT destination, UserT element, int shardCount) throws
Exception {
+
+ if (shardNumber == -1) {
+ shardNumber = ThreadLocalRandom.current().nextInt(shardCount);
+ } else {
+ shardNumber = (shardNumber + 1) % shardCount;
+ }
+
+ int destinationKey =
+ Arrays.hashCode(CoderUtils.encodeToByteArray(destinationCoder,
destination));
+ // we need to ensure that keys are always stable no matter at which
worker they
+ // are created and what is an order of observed shard numbers
+ if (cache.size() < shardNumber) {
+ for (int i = 0; i < shardNumber; i++) {
+ generateInternal(new CacheKey(destinationKey, i));
+ }
+ }
+
+ return generateInternal(new CacheKey(destinationKey, shardNumber));
+ }
+
+ private ShardedKey<Integer> generateInternal(CacheKey key) {
+
+ ShardedKey<Integer> result = cache.get(key);
+ if (result != null) {
+ return result;
+ }
+
+ int salt = -1;
+ while (true) {
+ salt++;
+ ShardedKey<Integer> shk = ShardedKey.of(Objects.hash(key.key, salt),
key.shard);
+ int targetPartition = key.shard % parallelism;
+
+ // create effective key in the same way Beam/Flink will do so we can
see if it gets
+ // allocated to the partition we want
+ ByteBuffer effectiveKey;
+ try {
+ byte[] bytes =
CoderUtils.encodeToByteArray(ShardedKeyCoder.of(VarIntCoder.of()), shk);
Review comment:
use `FlinkKeyUtils`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 238482)
Time Spent: 2h 10m (was: 2h)
> Auto sharding of streaming sinks in FlinkRunner
> -----------------------------------------------
>
> Key: BEAM-5865
> URL: https://issues.apache.org/jira/browse/BEAM-5865
> Project: Beam
> Issue Type: Improvement
> Components: runner-flink
> Reporter: Maximilian Michels
> Assignee: Jozef Vilcek
> Priority: Major
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
> The Flink Runner should do auto-sharding of streaming sinks, similar to
> BEAM-1438. That way, the user doesn't have to set shards manually which
> introduces additional shuffling and might cause skew in the distribution of
> data.
> As per discussion:
> https://lists.apache.org/thread.html/7b92145dd9ae68da1866f1047445479f51d31f103d6407316bb4114c@%3Cuser.beam.apache.org%3E
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)