It's a valid argument and I am not against changing rebalance() with the
formula you suggested.

I just don't see it as a bug -- only a unfortunate behavior due to
implementation details that only occurs on tiny data sets (which is not
a target application).

I will open a JIRA for it if there are no objections.

-Matthias

On 09/03/2015 04:06 PM, Fabian Hueske wrote:
> The purpose of rebalance() should be to rebalance the partitions of a data
> streams as evenly as possible, right?
> If all senders start sending data to the same receiver and there is less
> data in each partition than receivers, partitions are not evenly rebalanced.
> That is exactly the problem Arnaud ran into.
> 
> IMO, that's a bug and should be fixed.
> 
> 2015-09-03 15:53 GMT+02:00 Matthias J. Sax <mj...@apache.org>:
> 
>> For rebalance() this makes sense. I don't think anything must be
>> changed. For regular data, there is no such issues as for this very
>> small data set.
>>
>> However for shuffle() I would expect that each source task uses a
>> different shuffle pattern...
>>
>> -Matthias
>>
>> On 09/03/2015 03:28 PM, Fabian Hueske wrote:
>>> In case of rebalance(), all sources start the round-robin partitioning at
>>> index 0. Since each source emits only very few elements, only the first
>> 15
>>> mappers receive any input.
>>> It would be better to let each source start the round-robin partitioning
>> at
>>> a different index, something like startIdx = (numReceivers / numSenders)
>> *
>>> myIdx.
>>>
>>> In case of shuffle(), the ShufflePartitioner initializes Random()
>> without a
>>> seed (the current time is taken).
>>> However, the ShufflePartitioner is only initialized once at the client
>> side
>>> (if I see that correctly) and then the same instance is deserialized by
>> all
>>> operators, i.e., all use random number generators with the same seed.
>>>
>>> I think, the StreamPartitioner class should be extended with a
>>> configuration / initialize method which is called on each parallel
>> operator.
>>>
>>> Cheers, Fabian
>>>
>>> 2015-09-03 15:04 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>:
>>>
>>>> Hi,
>>>> I don't think it's a bug. If there are 100 sources that each emit only
>> 14
>>>> elements then only the first 14 mappers will ever receive data. The
>>>> round-robin distribution is not global, since the sources operate
>>>> independently from each other.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Wed, 2 Sep 2015 at 20:00 Matthias J. Sax <mj...@apache.org> wrote:
>>>>
>>>>> Thanks for clarifying. shuffle() is similar to rebalance() -- however,
>>>>> it redistributes randomly and not in round robin fashion.
>>>>>
>>>>> However, the problem you describe sounds like a bug to me. I included
>>>>> dev list. Maybe anyone else can step in so we can identify it there is
>> a
>>>>> bug or not.
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>> On 09/02/2015 06:19 PM, LINZ, Arnaud wrote:
>>>>>> Hi,
>>>>>>
>>>>>> You are right, but in fact it does not solve my problem, since I have
>>>>> 100 parallelism everywhere. Each of my 100 sources gives only a few
>> lines
>>>>> (say 14 max), and only the first 14 next nodes will receive data.
>>>>>> Same problem by replacing rebalance() with shuffle().
>>>>>>
>>>>>> But I found a workaround: setting parallelism to 1 for the source (I
>>>>> don't need a 100 directory scanners anyway), it forces the rebalancing
>>>>> evenly between the mappers.
>>>>>>
>>>>>> Greetings,
>>>>>> Arnaud
>>>>>>
>>>>>>
>>>>>> -----Message d'origine-----
>>>>>> De : Matthias J. Sax [mailto:mj...@apache.org]
>>>>>> Envoyé : mercredi 2 septembre 2015 17:56
>>>>>> À : u...@flink.apache.org
>>>>>> Objet : Re: How to force the parallelism on small streams?
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> If I understand you correctly, you want to have 100 mappers. Thus you
>>>>> need to apply the .setParallelism() after .map()
>>>>>>
>>>>>>>
>> addSource(myFileSource).rebalance().map(myFileMapper).setParallelism(1
>>>>>>> 00)
>>>>>>
>>>>>> The order of commands you used, set the dop for the source to 100
>>>> (which
>>>>> might be ignored, if the provided source function "myFileSource" does
>> not
>>>>> implements "ParallelSourceFunction" interface). The dop for the mapper
>>>>> should be the default value.
>>>>>>
>>>>>> Using .rebalance() is absolutely correct. It distributes the emitted
>>>>> tuples in a round robin fashion to all consumer tasks.
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 09/02/2015 05:41 PM, LINZ, Arnaud wrote:
>>>>>>> Hi,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I have a source that provides few items since it gives file names to
>>>>>>> the mappers. The mapper opens the file and process records. As the
>>>>>>> files are huge, one input line (a filename) gives a consequent work
>> to
>>>>> the next stage.
>>>>>>>
>>>>>>> My topology looks like :
>>>>>>>
>>>>>>>
>> addSource(myFileSource).rebalance().setParallelism(100).map(myFileMapp
>>>>>>> er)
>>>>>>>
>>>>>>> If 100 mappers are created, about 85 end immediately and only a few
>>>>>>> process the files (for hours). I suspect an optimization making that
>>>>>>> there is a minimum number of lines to pass to the next node or it is
>>>>>>> “shutdown” ; but in my case I do want the lines to be evenly
>>>>>>> distributed to each mapper.
>>>>>>>
>>>>>>> How to enforce that ?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Greetings,
>>>>>>>
>>>>>>> Arnaud
>>>>>>>
>>>>>>>
>>>>>>>
>> ----------------------------------------------------------------------
>>>>>>> --
>>>>>>>
>>>>>>> L'intégrité de ce message n'étant pas assurée sur internet, la
>> société
>>>>>>> expéditrice ne peut être tenue responsable de son contenu ni de ses
>>>>>>> pièces jointes. Toute utilisation ou diffusion non autorisée est
>>>>>>> interdite. Si vous n'êtes pas destinataire de ce message, merci de le
>>>>>>> détruire et d'avertir l'expéditeur.
>>>>>>>
>>>>>>> The integrity of this message cannot be guaranteed on the Internet.
>>>>>>> The company that sent this message cannot therefore be held liable
>> for
>>>>>>> its content nor attachments. Any unauthorized use or dissemination is
>>>>>>> prohibited. If you are not the intended recipient of this message,
>>>>>>> then please delete it and notify the sender.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to