Github user redsanket commented on a diff in the pull request:
https://github.com/apache/storm/pull/847#discussion_r43963775
--- Diff: storm-core/src/clj/backtype/storm/daemon/executor.clj ---
@@ -44,51 +45,53 @@
(defn- mk-fields-grouper [^Fields out-fields ^Fields group-fields ^List
target-tasks]
(let [num-tasks (count target-tasks)
task-getter (fn [i] (.get target-tasks i))]
- (fn [task-id ^List values]
+ (fn [task-id ^List values load]
(-> (.select out-fields group-fields values)
tuple/list-hash-code
(mod num-tasks)
task-getter))))
-(defn- mk-shuffle-grouper [^List target-tasks]
- (let [choices (rotating-random-range target-tasks)]
- (fn [task-id tuple]
- (acquire-random-range-id choices))))
-
(defn- mk-custom-grouper [^CustomStreamGrouping grouping
^WorkerTopologyContext context ^String component-id ^String stream-id
target-tasks]
(.prepare grouping context (GlobalStreamId. component-id stream-id)
target-tasks)
- (fn [task-id ^List values]
- (.chooseTasks grouping task-id values)
- ))
+ (if (instance? LoadAwareCustomStreamGrouping grouping)
+ (fn [task-id ^List values load]
+ (.chooseTasks grouping task-id values load))
+ (fn [task-id ^List values load]
+ (.chooseTasks grouping task-id values))))
+
+(defn mk-shuffle-grouper [^List target-tasks topo-conf
^WorkerTopologyContext context ^String component-id ^String stream-id]
--- End diff --
Should we have the arg list in the next line since it has many arguments to
be more readable?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---