[
https://issues.apache.org/jira/browse/STORM-67?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Robert Joseph Evans updated STORM-67:
-------------------------------------
Labels: newbie (was: )
> Provide API for spouts to know how many pending messages there are
> ------------------------------------------------------------------
>
> Key: STORM-67
> URL: https://issues.apache.org/jira/browse/STORM-67
> Project: Apache Storm
> Issue Type: New Feature
> Reporter: James Xu
> Labels: newbie
>
> https://github.com/nathanmarz/storm/issues/343
> This would be useful in case you want to take special action in the spout
> like drop messages
> -----------------
> Discmt: Hi, I'd like to try and take a crack at this if it's still relevant.
> I'm not exactly sure what it's asking for though. It seems to me an
> implementation for knowing how many pending messages there are for a spout
> depends on where the spout is getting it's information from, which makes me
> sure I'm missing something.
> -----------------
> revans2: The spout code in backtype/storm/daemon/executor.clj is already
> keeping track of the pending tuples if acking is enabled. If acking is
> disabled nothing is pending.
> defmethod mk-threads :spout [executor-data task-datas]
> defines pending as a RotatingMap which maps all of the storm internal tuple
> ids to the message id objects passed in by the spout when it first emitted
> the tuple. The hardest part should be getting pending to a place where the
> ISpoutOutputCollector implementation or where ever the API is, can get access
> to it.
> -----------------
> ptgoetz: @Discmt Yes, this is still relevant and would be nice to have.
> The Storm framework asks spouts for tuples by calling the nextTuple() method
> and keeps track of the tuple tree internally. The underlying data source does
> not come into play.
> As implied by @revans2, one approach would be to add a method to
> ISpoutOutputCollector such as getPendingCount() that would allow spout
> implementations to query for the pending count (probably returning -1 if
> acking is disabled). The tricky part will likely be bridging the gap between
> executor.clj and the ISpoutOutputCollector implementation(s). I haven't dug
> very deeply into the code, so off-hand I don't know how hard that would be. A
> quick search of the code for TOPOLOGY_MAX_PENDING should point you to some of
> the touch points.
> Also keep in mind the dual meaning of TOPOLOGY_MAX_PENDING. In a standard
> storm topology it represents the maximum number of outstanding tuples. In a
> trident topology it represents the maximum number of outstanding batches.
> -----------------
> Discmt: Hey guys. I've been taking time to look into it, and I feel like I
> might have an understanding of what exactly it is I need to do. If what
> @revans2 said is true, and all pending messages are kept within that
> RotatingMap then this should be somewhat straightforward. I am trying to
> compile my own storm.jar file right now but I haven't figured how. I tried
> using build_release.sh in the bin file, but I had no luck. I also tried using
> lein jar
> -----------------
> xumingming: try the following:
> lein sub install
> lein install
> after these commands are executed, there should be a jar file named
> storm-xxx.jar in $STORM_HOME/target/.
> -----------------
> Discmt: @xumingming . Thanks for the advice. I found that I had Leiningen 1,
> but the minimum for is Leiningen 2.
> -----------------
> xumingming: yeah, storm requires lein 2 to build:
> https://github.com/nathanmarz/storm/blob/master/project.clj#L14
> -----------------
> Discmt: Hi guys. I got my development environment squared away and I can
> properly build releases now. I use the build_release.sh script. I tried
> making a change the way @ptgoetz and @revans2 had suggested by adding a
> method to the output collector to return the pending count. I have some
> questions about it.
> I noticed most of the collector implementations rely on a delegate, or
> mediator, which I'm assuming is defined here:
> https://github.com/nathanmarz/storm/blob/master/storm-core/src/clj/backtype/storm/daemon/executor.clj#L504-515.
> So if I make a add a method to get the size of pending, defined here
> https://github.com/nathanmarz/storm/blob/master/storm-core/src/clj/backtype/storm/daemon/executor.clj#L408-414,
> like so:
> (SpoutOutputCollector.
> (reify ISpoutOutputCollector
> (^int getPendingCount[this]
> (.size pending)
> )
> (^List emit [this ^String stream-id ^List tuple ^Object
> message-id]
> (send-spout-msg stream-id tuple message-id nil)
> )
> (^void emitDirect [this ^int out-task-id ^String stream-id
> ^List tuple ^Object message-id]
> (send-spout-msg stream-id tuple message-id out-task-id)
> )
> (reportError [this error]
> (report-error error)
> )))))
> I should be good right? Aside from the collectors of the trident spouts which
> may take more research.
> Just so I'm clear, messages are considered pending if they have left the
> spout and are waiting to be "fully processed", as defined here:
> https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing ?
> My last question is: does anyone have any suggestions on what would be a good
> way to test this? i.e. "What kind of topology/scenario should I run".
> -----------------
> revans2: The code you pasted above is creating a new SoputOputCollector
> instance wrapping an ISpoutOutputCollector instance. In order for any java
> spout to actually get access to the getPendingCount method, you will need to
> modify ISpoutOutputCollector to have that method defined in it, and also
> update SpoutOutputCollector, etc to also implement that method delegating to
> the ISpoutOutputCollector instance.
> You are correct about the definition of pending.
> Testing is a bit more difficult. You could have a spout that sends out
> messages to a bolt that does not process them. You could then verify that
> each time you send out a message the pending count goes up by 1. But to fully
> test it would would need to have some coordination between the spout and the
> bolt. This is not impossible but you may need to use global values to do so.
> -----------------
> Discmt: @revans2 I got the interface implementation covered. Thank you for
> the testing suggestion, and hints on how to perform it. It's incredibly
> helpful. I think that's a good idea, and I'm going to try and do that to test
> it out.
> -----------------
> nathanmarz: There's testing infrastructure already built that can do this
> kind of tracking. The name in the testing code is "tracked topologies". I'm
> traveling right now so can't give a link but you should be able to find it.
> It's used quite a bit in the tests of the acking system.
> -----------------
> Discmt: @nathanmarz This is also good news. I'll take a look and use what I
> can there to speed up the process for me.
> -----------------
> Discmt: Hi, I am asking if someone can explain to me how these tracked
> topologies work.
> Particularly I'm confused about these lines here:
> https://github.com/nathanmarz/storm/blob/master/storm-core/test/clj/backtype/storm/integration_test.clj#L224-245
> Looking in integration_test.clj under the function test-acking on line 219
> one of the spouts is told to feed a tuple(.feed feeder1 [1]). Then a
> tracked-wait is called on the topology being tracked to wait for one tuple to
> be emitted from the spout. Afterwards the checker checks whether or not 0
> tuples have been acknowledged, and this what I expect is that one 1 tuple
> would have been acked because it was emitted from the spout. However, this is
> not the case. Then later on there checker1 is called again expecting there to
> be one tuple, but another one was not fed. Furthermore, feeder2, a second
> spout, was told to feed a tuple, and as expected one was already there when
> checker2 checked for one ack, but this behavior is not the same as the first
> checker.
> -----------------
> nathanmarz: tracked-wait waits until the entire tree of processing of the
> tuple has finished. The "checker1" function checks how many tuples on the
> spout have been acked since last time it was called. The topology is set up
> so that acknowledgement of tuple trees is delayed until bolts have received
> multiple tuples. That's why nothing is acked on the spout after the first
> tuple is emitted, but it is when the second tuple is emitted. If you look at
> the logic of branching-bolt and agg-bolt you'll see why this is the case.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)