Hi,

Thank you for clarifications (I am working with KeyedStream so a custom 
partitioner does not help).

So I should set maxParallelism>=parallelism and change my keys (from 
input.keyBy(0)) such that key group assignment works as expected), 
but I can’t modify these keys in order to make it work.

The other option is to change Flink’s internals in order to evenly distribute 
keys (changing computeKeyGroupForKeyHash: is this enough?).
What I was looking for was an api to change the way key group assignment is 
done, but without changing Flink’s runtime.

I think that the maxParallelism setting is not enough (it introduces this 
inefficient way of distributing data for processing when using KeyedStream).
Is it possible to expose somehow the key group assignment?

This is how keys are distributed (1024 keys, key=1..1024; and groups from 2 to 
16 - equiv. parallelism that is number of slots):

{0=517, 1=507} 2
{0=881, 1=809, 2=358} 3
{0=1139, 1=1048, 2=617, 3=268} 4
{0=1319, 1=1268, 2=829, 3=473, 4=207} 5
{0=1512, 1=1425, 2=1008, 3=644, 4=352, 5=179} 6
{0=1656, 1=1586, 2=1160, 3=781, 4=512, 5=310, 6=139} 7
{0=1781, 1=1718, 2=1280, 3=908, 4=645, 5=417, 6=278, 7=141} 8
{0=1901, 1=1828, 2=1395, 3=1031, 4=738, 5=529, 6=399, 7=240, 8=131} 9
{0=1996, 1=1934, 2=1493, 3=1134, 4=846, 5=614, 6=513, 7=354, 8=233, 9=99} 10
{0=2094, 1=2017, 2=1577, 3=1226, 4=935, 5=713, 6=610, 7=434, 8=359, 9=174, 
10=101} 11
{0=2192, 1=2091, 2=1669, 3=1316, 4=1008, 5=797, 6=705, 7=517, 8=446, 9=255, 
10=173, 11=95} 12
{0=2257, 1=2175, 2=1741, 3=1396, 4=1079, 5=882, 6=785, 7=596, 8=524, 9=340, 
10=254, 11=186, 12=73} 13
{0=2330, 1=2258, 2=1816, 3=1458, 4=1160, 5=951, 6=858, 7=667, 8=602, 9=417, 
10=329, 11=265, 12=135, 13=66} 14
{0=2397, 1=2323, 2=1897, 3=1542, 4=1233, 5=1008, 6=934, 7=723, 8=671, 9=479, 
10=385, 11=344, 12=210, 13=118, 14=72} 15
{0=2454, 1=2395, 2=1949, 3=1603, 4=1296, 5=1055, 6=998, 7=803, 8=739, 9=539, 
10=453, 11=410, 12=280, 13=178, 14=147, 15=61} 16

Best,
Ovidiu

> On 20 Feb 2017, at 12:04, Till Rohrmann <trohrm...@apache.org> wrote:
> 
> Hi Ovidiu,
> 
> the way Flink works is to assign key group ranges to operators. For each 
> element you calculate a hash value and based on that you assign it to a key 
> group. Thus, in your example, you have either a key group with more than 1 
> key or multiple key groups with 1 or more keys assigned to an operator.
> 
> So what you could try to do is to reduce the number of key groups to your 
> parallelism via env.setMaxParallelism() and then try to figure a key out 
> whose hashes are uniformly distributed over the key groups. The key group 
> assignment is calculated via murmurHash(key.hashCode()) % maxParallelism.
> 
> Alternatively if you don’t need a keyed stream, you could try to use a custom 
> partitioner via DataStream.partitionCustom.
> 
> Cheers,
> Till
> 
> 
> On Mon, Feb 20, 2017 at 11:46 AM, Ovidiu-Cristian MARCU 
> <ovidiu-cristian.ma...@inria.fr <mailto:ovidiu-cristian.ma...@inria.fr>> 
> wrote:
> Hi,
> 
> Can you please comment on how can I ensure stream input records are 
> distributed evenly onto task slots?
> See attached screen Records received issue.
> 
> I have a simple application which is applying some window function over a 
> stream partitioned as follows:
> (parallelism is equal to the number of keys; records with the same key are 
> streamed evenly)
> 
> // get the execution environment
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // get input data by connecting to the socket
> DataStream<String> text = env.socketTextStream("localhost", port, "\n");
> DataStream<Tuple8<String, String, String, Integer, String, Double, Long, 
> Long>> input = text.flatMap(...);
> DataStream<Double> counts1 = null;
> counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
>               .apply(new WindowFunction<Tuple8<String, String, String, 
> Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
>               ...
>               });
> counts1.writeAsText(params.get("output1"));
> env.execute("Socket Window WordCount”);
> 
> Best,
> Ovidiu
> 
> 

Reply via email to