[
https://issues.apache.org/jira/browse/STORM-162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14990818#comment-14990818
]
ASF GitHub Bot commented on STORM-162:
--------------------------------------
Github user redsanket commented on a diff in the pull request:
https://github.com/apache/storm/pull/847#discussion_r43963775
--- Diff: storm-core/src/clj/backtype/storm/daemon/executor.clj ---
@@ -44,51 +45,53 @@
(defn- mk-fields-grouper [^Fields out-fields ^Fields group-fields ^List
target-tasks]
(let [num-tasks (count target-tasks)
task-getter (fn [i] (.get target-tasks i))]
- (fn [task-id ^List values]
+ (fn [task-id ^List values load]
(-> (.select out-fields group-fields values)
tuple/list-hash-code
(mod num-tasks)
task-getter))))
-(defn- mk-shuffle-grouper [^List target-tasks]
- (let [choices (rotating-random-range target-tasks)]
- (fn [task-id tuple]
- (acquire-random-range-id choices))))
-
(defn- mk-custom-grouper [^CustomStreamGrouping grouping
^WorkerTopologyContext context ^String component-id ^String stream-id
target-tasks]
(.prepare grouping context (GlobalStreamId. component-id stream-id)
target-tasks)
- (fn [task-id ^List values]
- (.chooseTasks grouping task-id values)
- ))
+ (if (instance? LoadAwareCustomStreamGrouping grouping)
+ (fn [task-id ^List values load]
+ (.chooseTasks grouping task-id values load))
+ (fn [task-id ^List values load]
+ (.chooseTasks grouping task-id values))))
+
+(defn mk-shuffle-grouper [^List target-tasks topo-conf
^WorkerTopologyContext context ^String component-id ^String stream-id]
--- End diff --
Should we have the arg list in the next line since it has many arguments to
be more readable?
> Load Balancing Shuffle Grouping
> -------------------------------
>
> Key: STORM-162
> URL: https://issues.apache.org/jira/browse/STORM-162
> Project: Apache Storm
> Issue Type: Wish
> Components: storm-core
> Reporter: James Xu
> Assignee: Robert Joseph Evans
> Priority: Minor
>
> https://github.com/nathanmarz/storm/issues/571
> Hey @nathanmarz,
> I think that the current shuffle grouping is creating very obvious hot-spots
> in load on hosts here at Twitter. The reason is that randomized message
> distribution to the workers is susceptible to the balls and bins problem:
> http://pages.cs.wisc.edu/~shuchi/courses/787-F07/scribe-notes/lecture07.pdf
> the odds that some particular queue gets bogged down when you're assigning
> tasks randomly is high. You can solve this problem with a load-aware shuffle
> grouping -- when shuffling, prefer tasks with lower load.
> What would it take to implement this feature?
> ----------
> sritchie: Looks like Rap Genius was heavily affected when Heroku started
> running a "shuffle grouping" on tasks to its dynos:
> http://rapgenius.com/James-somers-herokus-ugly-secret-lyrics
> 50x performance degradation over a more intelligent load-balancing scheme
> that only sent tasks to non-busy dynos. Seems very relevant to Storm.
> ----------
> nathanmarz: It's doing randomized round robin, not fully random distribution.
> So every downstream task gets the same number of messages. But yes, I agree
> that this would be a great feature. Basically what this requires is making
> stats of downstream tasks available to the stream grouping code. The best way
> to implement this would be:
> Implement a broadcast message type in the networking code, so that one can
> efficiently send a large object to all tasks in a worker (rather than having
> to send N copies of that large message)
> Have a single executor in every topology that polls nimbus for accumulated
> stats once per minute and then broadcasts that information to all tasks in
> all workers
> Wire up the task code to pass that information along from the task to the
> outgoing stream groupings for that task (and adding appropriate methods to
> the CustomStreamGrouping interface to receive the stats info)
> ----------
> sorenmacbeth: @nathanmarz @sritchie Did any progress ever get made on this?
> Is the description above still relevant to Storm 0.9.0. We are getting bitten
> by this problem and would love to see something like this implemented.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)