[
https://issues.apache.org/jira/browse/STORM-756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15023865#comment-15023865
]
ASF GitHub Bot commented on STORM-756:
--------------------------------------
Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/897#discussion_r45701129
--- Diff: storm-core/src/jvm/backtype/storm/task/ShellBolt.java ---
@@ -376,15 +373,18 @@ public void run() {
sendHeartbeatFlag.compareAndSet(true, false);
}
- Object write = _pendingWrites.poll(1, SECONDS);
- if (write instanceof BoltMsg) {
- _process.writeBoltMsg((BoltMsg) write);
- } else if (write instanceof List<?>) {
- _process.writeTaskIds((List<Integer>)write);
- } else if (write != null) {
- throw new RuntimeException("Unknown class type to
write: " + write.getClass().getName());
+ List<Integer> taskIds = _pendingTaskIds.peek();
--- End diff --
Implemented ShellBoltMessageQueue which accomplishes two things at once,
* polling with priority that task ids > bolt message
* when poll() is called, waiting up to the specified wait time if necessary
for an any kind (task ids / bolt message) of element to become available
** same as current ShellBolt implementation
> [multilang] Introduce overflow control mechanism
> ------------------------------------------------
>
> Key: STORM-756
> URL: https://issues.apache.org/jira/browse/STORM-756
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-multilang
> Affects Versions: 0.10.0, 0.9.4, 0.11.0
> Reporter: Jungtaek Lim
> Assignee: Jungtaek Lim
>
> It's from STORM-738,
> https://issues.apache.org/jira/browse/STORM-738?focusedCommentId=14394106&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14394106
> A. ShellBolt side control
> We can modify ShellBolt to have sent tuple ids list, and stop sending tuples
> when list exceeds configured max value. In order to achieve this, subprocess
> should notify "tuple id is complete" to ShellBolt.
> * It introduces new commands for multi-lang, "proceed" (or better name)
> * ShellBolt stores in-progress-of-processing tuples list.
> * Its overhead could be big, subprocess should always notify to ShellBolt
> when any tuples are processed.
> B. subprocess side control
> We can modify subprocess to check pending queue after reading tuple.
> If it exceeds configured max value, subprocess can request "delay" to
> ShellBolt for slowing down.
> When ShellBolt receives "delay", BoltWriterRunnable should stop polling
> pending queue and continue polling later.
> How long ShellBolt wait for resending? Its unit would be "delay time" or
> "tuple count". I don't know which is better yet.
> * It introduces new commands for multi-lang, "delay" (or better name)
> * I don't think it would be introduced soon, but subprocess can request delay
> based on own statistics. (ex. pending tuple count * average tuple processed
> time for time unit, average pending tuple count for count unit)
> ** We can leave when and how much to request "delay" to user. User can make
> his/her own algorithm to control flooding.
> In my opinion B seems to more natural cause current issue is by subprocess
> side so it would be better to let subprocess overcome it.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)