It's a valid argument and I am not against changing rebalance() with the formula you suggested.
I just don't see it as a bug -- only a unfortunate behavior due to implementation details that only occurs on tiny data sets (which is not a target application). I will open a JIRA for it if there are no objections. -Matthias On 09/03/2015 04:06 PM, Fabian Hueske wrote: > The purpose of rebalance() should be to rebalance the partitions of a data > streams as evenly as possible, right? > If all senders start sending data to the same receiver and there is less > data in each partition than receivers, partitions are not evenly rebalanced. > That is exactly the problem Arnaud ran into. > > IMO, that's a bug and should be fixed. > > 2015-09-03 15:53 GMT+02:00 Matthias J. Sax <mj...@apache.org>: > >> For rebalance() this makes sense. I don't think anything must be >> changed. For regular data, there is no such issues as for this very >> small data set. >> >> However for shuffle() I would expect that each source task uses a >> different shuffle pattern... >> >> -Matthias >> >> On 09/03/2015 03:28 PM, Fabian Hueske wrote: >>> In case of rebalance(), all sources start the round-robin partitioning at >>> index 0. Since each source emits only very few elements, only the first >> 15 >>> mappers receive any input. >>> It would be better to let each source start the round-robin partitioning >> at >>> a different index, something like startIdx = (numReceivers / numSenders) >> * >>> myIdx. >>> >>> In case of shuffle(), the ShufflePartitioner initializes Random() >> without a >>> seed (the current time is taken). >>> However, the ShufflePartitioner is only initialized once at the client >> side >>> (if I see that correctly) and then the same instance is deserialized by >> all >>> operators, i.e., all use random number generators with the same seed. >>> >>> I think, the StreamPartitioner class should be extended with a >>> configuration / initialize method which is called on each parallel >> operator. >>> >>> Cheers, Fabian >>> >>> 2015-09-03 15:04 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>: >>> >>>> Hi, >>>> I don't think it's a bug. If there are 100 sources that each emit only >> 14 >>>> elements then only the first 14 mappers will ever receive data. The >>>> round-robin distribution is not global, since the sources operate >>>> independently from each other. >>>> >>>> Cheers, >>>> Aljoscha >>>> >>>> On Wed, 2 Sep 2015 at 20:00 Matthias J. Sax <mj...@apache.org> wrote: >>>> >>>>> Thanks for clarifying. shuffle() is similar to rebalance() -- however, >>>>> it redistributes randomly and not in round robin fashion. >>>>> >>>>> However, the problem you describe sounds like a bug to me. I included >>>>> dev list. Maybe anyone else can step in so we can identify it there is >> a >>>>> bug or not. >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> On 09/02/2015 06:19 PM, LINZ, Arnaud wrote: >>>>>> Hi, >>>>>> >>>>>> You are right, but in fact it does not solve my problem, since I have >>>>> 100 parallelism everywhere. Each of my 100 sources gives only a few >> lines >>>>> (say 14 max), and only the first 14 next nodes will receive data. >>>>>> Same problem by replacing rebalance() with shuffle(). >>>>>> >>>>>> But I found a workaround: setting parallelism to 1 for the source (I >>>>> don't need a 100 directory scanners anyway), it forces the rebalancing >>>>> evenly between the mappers. >>>>>> >>>>>> Greetings, >>>>>> Arnaud >>>>>> >>>>>> >>>>>> -----Message d'origine----- >>>>>> De : Matthias J. Sax [mailto:mj...@apache.org] >>>>>> Envoyé : mercredi 2 septembre 2015 17:56 >>>>>> À : u...@flink.apache.org >>>>>> Objet : Re: How to force the parallelism on small streams? >>>>>> >>>>>> Hi, >>>>>> >>>>>> If I understand you correctly, you want to have 100 mappers. Thus you >>>>> need to apply the .setParallelism() after .map() >>>>>> >>>>>>> >> addSource(myFileSource).rebalance().map(myFileMapper).setParallelism(1 >>>>>>> 00) >>>>>> >>>>>> The order of commands you used, set the dop for the source to 100 >>>> (which >>>>> might be ignored, if the provided source function "myFileSource" does >> not >>>>> implements "ParallelSourceFunction" interface). The dop for the mapper >>>>> should be the default value. >>>>>> >>>>>> Using .rebalance() is absolutely correct. It distributes the emitted >>>>> tuples in a round robin fashion to all consumer tasks. >>>>>> >>>>>> -Matthias >>>>>> >>>>>> On 09/02/2015 05:41 PM, LINZ, Arnaud wrote: >>>>>>> Hi, >>>>>>> >>>>>>> >>>>>>> >>>>>>> I have a source that provides few items since it gives file names to >>>>>>> the mappers. The mapper opens the file and process records. As the >>>>>>> files are huge, one input line (a filename) gives a consequent work >> to >>>>> the next stage. >>>>>>> >>>>>>> My topology looks like : >>>>>>> >>>>>>> >> addSource(myFileSource).rebalance().setParallelism(100).map(myFileMapp >>>>>>> er) >>>>>>> >>>>>>> If 100 mappers are created, about 85 end immediately and only a few >>>>>>> process the files (for hours). I suspect an optimization making that >>>>>>> there is a minimum number of lines to pass to the next node or it is >>>>>>> “shutdown” ; but in my case I do want the lines to be evenly >>>>>>> distributed to each mapper. >>>>>>> >>>>>>> How to enforce that ? >>>>>>> >>>>>>> >>>>>>> >>>>>>> Greetings, >>>>>>> >>>>>>> Arnaud >>>>>>> >>>>>>> >>>>>>> >> ---------------------------------------------------------------------- >>>>>>> -- >>>>>>> >>>>>>> L'intégrité de ce message n'étant pas assurée sur internet, la >> société >>>>>>> expéditrice ne peut être tenue responsable de son contenu ni de ses >>>>>>> pièces jointes. Toute utilisation ou diffusion non autorisée est >>>>>>> interdite. Si vous n'êtes pas destinataire de ce message, merci de le >>>>>>> détruire et d'avertir l'expéditeur. >>>>>>> >>>>>>> The integrity of this message cannot be guaranteed on the Internet. >>>>>>> The company that sent this message cannot therefore be held liable >> for >>>>>>> its content nor attachments. Any unauthorized use or dissemination is >>>>>>> prohibited. If you are not the intended recipient of this message, >>>>>>> then please delete it and notify the sender. >>>>>> >>>>> >>>>> >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature