[ https://issues.apache.org/jira/browse/STORM-329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14247065#comment-14247065 ]
ASF GitHub Bot commented on STORM-329: -------------------------------------- Github user clockfly commented on the pull request: https://github.com/apache/storm/pull/268#issuecomment-67047625 @nathanmarz , I'd like to explain why I need to change worker.clj. This was also motivated by a legacy TODO in in zmq.clj. https://github.com/nathanmarz/storm/blob/0.8.2/src/clj/backtype/storm/messaging/zmq.clj#L43 ``` (send [this task message] ... (mq/send socket message ZMQ/NOBLOCK)) ;; TODO: how to do backpressure if doing noblock?... need to only unblock if the target disappears ``` As we can see, zeromq transport will send message in non-blocking way. If I understand this TODO correctly, it wants, a) When target worker is not booted yet, the source worker should not send message to target. Otherwise, as there is no backpressure, there will be message loss during the bootup phase. If it is un unacked topology, the message loss is permanent; if it is an acked topology, we will need to do unnecessary replay. b) When target worker disappears in the middle(crash?), the source worker should drop the messages directly. The problem is that: transport layer don't know by itself whether the target worker is "booting up" or "crashed in the running phase", so it cannot smartly choose between "back pressure" or "drop". If the transport simplifiy choose "block", it is good for "booting up" phase, but bad for "running phase". If one connection is down, it may block messages sent to other connections. If the transport simplify choose "drop", it is good for "running phase", but bad for "booting up" phase. If the target worker is booted 30 seconds later, all message between this 30 seconds will be dropped. The changes in "worker.clj" is targeted to solve this problem. Worker knows when the target worker connections are ready. In the bootup phase, worker.clj will wait target worker connection is ready, then it will activate the source worker tasks. In the “runtime phase", the transport will simply drop the messages if target worker crashed in the middle. There will be several benefits: 1. During cluster bootup, for unacked topology, there will be no strange message loss. 2. During cluster bootup, for acked topology, it can take less time to reach the normal throughput, as there is no message loss, timeout, and replay. 3. For transport layer, the design is simplified. We can just drop the messages if target worker is not available. > Add Option to Config Message handling strategy when connection timeout > ---------------------------------------------------------------------- > > Key: STORM-329 > URL: https://issues.apache.org/jira/browse/STORM-329 > Project: Apache Storm > Issue Type: Improvement > Affects Versions: 0.9.2-incubating > Reporter: Sean Zhong > Priority: Minor > Labels: Netty > Attachments: storm-329.patch, worker-kill-recover3.jpg > > > This is to address a [concern brought > up|https://github.com/apache/incubator-storm/pull/103#issuecomment-43632986] > during the work at STORM-297: > {quote} > [~revans2] wrote: Your logic makes since to me on why these calls are > blocking. My biggest concern around the blocking is in the case of a worker > crashing. If a single worker crashes this can block the entire topology from > executing until that worker comes back up. In some cases I can see that being > something that you would want. In other cases I can see speed being the > primary concern and some users would like to get partial data fast, rather > then accurate data later. > Could we make it configurable on a follow up JIRA where we can have a max > limit to the buffering that is allowed, before we block, or throw data away > (which is what zeromq does)? > {quote} > If some worker crash suddenly, how to handle the message which was supposed > to be delivered to the worker? > 1. Should we buffer all message infinitely? > 2. Should we block the message sending until the connection is resumed? > 3. Should we config a buffer limit, try to buffer the message first, if the > limit is met, then block? > 4. Should we neither block, nor buffer too much, but choose to drop the > messages, and use the built-in storm failover mechanism? -- This message was sent by Atlassian JIRA (v6.3.4#6332)