[ 
https://issues.apache.org/jira/browse/BEAM-5865?focusedWorklogId=239708&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-239708
 ]

ASF GitHub Bot logged work on BEAM-5865:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/May/19 11:35
            Start Date: 09/May/19 11:35
    Worklog Time Spent: 10m 
      Work Description: JozoVilcek commented on pull request #8499: [BEAM-5865] 
Create optional auto-balancing sharding function for Flink
URL: https://github.com/apache/beam/pull/8499#discussion_r282447169
 
 

 ##########
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
 ##########
 @@ -211,6 +261,20 @@ boolean canTranslate(T transform, 
FlinkStreamingTranslationContext context) {
         if (WriteFilesTranslation.isWindowedWrites(transform)) {
           replacement = replacement.withWindowedWrites();
         }
+
+        if (options.isAutoBalanceWriteFilesShardingEnabled()) {
+          Preconditions.checkArgument(
+              options.getParallelism() > 0,
+              "Parallelism is required to be set in FlinkPipelineOptions when 
isAutoBalanceWriteFilesShardingEnabled");
+
+          replacement =
+              replacement.withShardingFunction(
+                  new FlinkAutoBalancedShardKeyShardingFunction<>(
+                      options.getParallelism(),
+                      options.getMaxParallelism(),
+                      sink.getDynamicDestinations().getDestinationCoder()));
+        }
+
         return PTransformReplacement.of(
             PTransformReplacements.getSingletonMainInput(transform),
             replacement.withNumShards(numShards));
 
 Review comment:
   I have a problem here. I found out that now, my explicit number of shards 
gets always override here to runner determined. 
   I see that `replacement` is recreated from scratch and and must be created 
full. It seems that `num shards` is not explicit in payload but gets injected 
here and there during the transform `expand` (just guessing here)?.
   It seems I can not just put sharding function to existing WriteFiles 
context, but needs to create fresh new one. Am I right?
   
   I need a help here. Is there an easy way of accessing `numShards`? Do I need 
to change `WriteFilesPayload` and make it explicit ? @reuvenlax what approach 
would you suggest to resolve this? 
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 239708)
    Time Spent: 5h  (was: 4h 50m)

> Auto sharding of streaming sinks in FlinkRunner
> -----------------------------------------------
>
>                 Key: BEAM-5865
>                 URL: https://issues.apache.org/jira/browse/BEAM-5865
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>            Reporter: Maximilian Michels
>            Assignee: Jozef Vilcek
>            Priority: Major
>          Time Spent: 5h
>  Remaining Estimate: 0h
>
> The Flink Runner should do auto-sharding of streaming sinks, similar to 
> BEAM-1438. That way, the user doesn't have to set shards manually which 
> introduces additional shuffling and might cause skew in the distribution of 
> data.
> As per discussion: 
> https://lists.apache.org/thread.html/7b92145dd9ae68da1866f1047445479f51d31f103d6407316bb4114c@%3Cuser.beam.apache.org%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to