Hi Till,

I will look into filling a jira issue.

Regarding the key group assignment, you;re right, there was a mistake in my 
code, here it is code and distribution:
numServers is maxParallelism

int numKeys = 1024;
                HashMap<Integer, Integer> groups = new HashMap<Integer, 
Integer>();
                for (int numServers = 2; numServers < 17; numServers++) {
                        groups = new HashMap<Integer, Integer>();
                        for (int i = 0; i < numKeys; i++) {
                                int targetKeyGroupIndex = 
MathUtils.murmurHash(i) % numServers;
                                Integer mygroup = 
groups.get(targetKeyGroupIndex);
                                int count = mygroup == null ? 0 : mygroup;
                                groups.put(targetKeyGroupIndex, ++count);
                        }
                        System.out.println(groups + " " + numServers);
                }

{0=517, 1=507} 2
{0=364, 1=302, 2=358} 3
{0=258, 1=239, 2=259, 3=268} 4
{0=180, 1=220, 2=212, 3=205, 4=207} 5
{0=193, 1=157, 2=179, 3=171, 4=145, 5=179} 6
{0=144, 1=161, 2=152, 3=137, 4=160, 5=131, 6=139} 7
{0=125, 1=132, 2=120, 3=127, 4=133, 5=107, 6=139, 7=141} 8
{0=120, 1=110, 2=115, 3=123, 4=93, 5=112, 6=121, 7=99, 8=131} 9
{0=95, 1=106, 2=98, 3=103, 4=108, 5=85, 6=114, 7=114, 8=102, 9=99} 10
{0=98, 1=83, 2=84, 3=92, 4=89, 5=99, 6=97, 7=80, 8=126, 9=75, 10=101} 11
{0=98, 1=74, 2=92, 3=90, 4=73, 5=84, 6=95, 7=83, 8=87, 9=81, 10=72, 11=95} 12
{0=65, 1=84, 2=72, 3=80, 4=71, 5=85, 6=80, 7=79, 8=78, 9=85, 10=81, 11=91, 
12=73} 13
{0=73, 1=83, 2=75, 3=62, 4=81, 5=69, 6=73, 7=71, 8=78, 9=77, 10=75, 11=79, 
12=62, 13=66} 14
{0=67, 1=65, 2=81, 3=84, 4=73, 5=57, 6=76, 7=56, 8=69, 9=62, 10=56, 11=79, 
12=75, 13=52, 14=72} 15
{0=57, 1=72, 2=52, 3=61, 4=63, 5=47, 6=64, 7=80, 8=68, 9=60, 10=68, 11=66, 
12=70, 13=60, 14=75, 15=61} 16


Best,
Ovidiu

> On 21 Feb 2017, at 10:52, Till Rohrmann <trohrm...@apache.org> wrote:
> 
> Hi Ovidiu,
> 
> at the moment it is not possible to plugin a user defined hash function/key
> group assignment function. If you like, then you can file a JIRA issue to
> add this functionality.
> 
> The key group assignment in your example looks quite skewed. One question
> concerning how you calculated it: Shouldn't the number of element in each
> group sum up to 1024? this only works for the first case. What do the
> numbers mean then?
> 
> Cheers,
> Till
> 
> On Mon, Feb 20, 2017 at 3:45 PM, Ovidiu-Cristian MARCU <
> ovidiu-cristian.ma...@inria.fr <mailto:ovidiu-cristian.ma...@inria.fr>> wrote:
> 
>> Hi,
>> 
>> Thank you for clarifications (I am working with KeyedStream so a custom
>> partitioner does not help).
>> 
>> So I should set maxParallelism>=parallelism and change my keys (from
>> input.keyBy(0)) such that key group assignment works as expected),
>> but I can’t modify these keys in order to make it work.
>> 
>> The other option is to change Flink’s internals in order to evenly
>> distribute keys (changing computeKeyGroupForKeyHash: is this enough?).
>> What I was looking for was an api to change the way key group assignment
>> is done, but without changing Flink’s runtime.
>> 
>> I think that the maxParallelism setting is not enough (it introduces this
>> inefficient way of distributing data for processing when using KeyedStream).
>> Is it possible to expose somehow the key group assignment?
>> 
>> This is how keys are distributed (1024 keys, key=1..1024; and groups from
>> 2 to 16 - equiv. parallelism that is number of slots):
>> 
>> {0=517, 1=507} 2
>> {0=881, 1=809, 2=358} 3
>> {0=1139, 1=1048, 2=617, 3=268} 4
>> {0=1319, 1=1268, 2=829, 3=473, 4=207} 5
>> {0=1512, 1=1425, 2=1008, 3=644, 4=352, 5=179} 6
>> {0=1656, 1=1586, 2=1160, 3=781, 4=512, 5=310, 6=139} 7
>> {0=1781, 1=1718, 2=1280, 3=908, 4=645, 5=417, 6=278, 7=141} 8
>> {0=1901, 1=1828, 2=1395, 3=1031, 4=738, 5=529, 6=399, 7=240, 8=131} 9
>> {0=1996, 1=1934, 2=1493, 3=1134, 4=846, 5=614, 6=513, 7=354, 8=233, 9=99}
>> 10
>> {0=2094, 1=2017, 2=1577, 3=1226, 4=935, 5=713, 6=610, 7=434, 8=359, 9=174,
>> 10=101} 11
>> {0=2192, 1=2091, 2=1669, 3=1316, 4=1008, 5=797, 6=705, 7=517, 8=446,
>> 9=255, 10=173, 11=95} 12
>> {0=2257, 1=2175, 2=1741, 3=1396, 4=1079, 5=882, 6=785, 7=596, 8=524,
>> 9=340, 10=254, 11=186, 12=73} 13
>> {0=2330, 1=2258, 2=1816, 3=1458, 4=1160, 5=951, 6=858, 7=667, 8=602,
>> 9=417, 10=329, 11=265, 12=135, 13=66} 14
>> {0=2397, 1=2323, 2=1897, 3=1542, 4=1233, 5=1008, 6=934, 7=723, 8=671,
>> 9=479, 10=385, 11=344, 12=210, 13=118, 14=72} 15
>> {0=2454, 1=2395, 2=1949, 3=1603, 4=1296, 5=1055, 6=998, 7=803, 8=739,
>> 9=539, 10=453, 11=410, 12=280, 13=178, 14=147, 15=61} 16
>> 
>> Best,
>> Ovidiu
>> 
>>> On 20 Feb 2017, at 12:04, Till Rohrmann <trohrm...@apache.org> wrote:
>>> 
>>> Hi Ovidiu,
>>> 
>>> the way Flink works is to assign key group ranges to operators. For each
>> element you calculate a hash value and based on that you assign it to a key
>> group. Thus, in your example, you have either a key group with more than 1
>> key or multiple key groups with 1 or more keys assigned to an operator.
>>> 
>>> So what you could try to do is to reduce the number of key groups to
>> your parallelism via env.setMaxParallelism() and then try to figure a key
>> out whose hashes are uniformly distributed over the key groups. The key
>> group assignment is calculated via murmurHash(key.hashCode()) %
>> maxParallelism.
>>> 
>>> Alternatively if you don’t need a keyed stream, you could try to use a
>> custom partitioner via DataStream.partitionCustom.
>>> 
>>> Cheers,
>>> Till
>>> 
>>> 
>>> On Mon, Feb 20, 2017 at 11:46 AM, Ovidiu-Cristian MARCU <
>> ovidiu-cristian.ma...@inria.fr <mailto:ovidiu-cristian.ma...@inria.fr> 
>> <mailto:ovidiu-cristian.ma...@inria.fr 
>> <mailto:ovidiu-cristian.ma...@inria.fr>>>
>> wrote:
>>> Hi,
>>> 
>>> Can you please comment on how can I ensure stream input records are
>> distributed evenly onto task slots?
>>> See attached screen Records received issue.
>>> 
>>> I have a simple application which is applying some window function over
>> a stream partitioned as follows:
>>> (parallelism is equal to the number of keys; records with the same key
>> are streamed evenly)
>>> 
>>> // get the execution environment
>>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.
>> getExecutionEnvironment();
>>> // get input data by connecting to the socket
>>> DataStream<String> text = env.socketTextStream("localhost", port, "\n");
>>> DataStream<Tuple8<String, String, String, Integer, String, Double, Long,
>> Long>> input = text.flatMap(...);
>>> DataStream<Double> counts1 = null;
>>> counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
>>>              .apply(new WindowFunction<Tuple8<String, String, String,
>> Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
>>>              ...
>>>              });
>>> counts1.writeAsText(params.get("output1"));
>>> env.execute("Socket Window WordCount”);
>>> 
>>> Best,
>>> Ovidiu

Reply via email to