@Jungtaek, Yes you're right. Since most of our use cases are multi-thread spouts, this is not a problem for us.
As to Storm, I think we can use your second workaround for now, and re-evaluate this when we finish porting multi-thread spout in Phase 2. On Fri, Apr 29, 2016 at 6:10 PM, Jungtaek Lim <kabh...@gmail.com> wrote: > FYI: Issue STORM-1742 <https://issues.apache.org/jira/browse/STORM-1742> > and > related pull request (WIP) #1379 > <https://github.com/apache/storm/pull/1379> are > available. > > I've also done with functional test so that you can easily see what I'm > claiming. > > Thanks, > Jungtaek Lim (HeartSaVioR) > > 2016년 4월 29일 (금) 오후 4:59, Jungtaek Lim <kabh...@gmail.com>님이 작성: > > > sorry some correction which may confuse someone: > > > > With this circumstance there's no issue to keep it as is, since *users > > normally don't implemented ack() / fail() as long blocking method*. > > > > 2016년 4월 29일 (금) 오후 4:56, Jungtaek Lim <kabh...@gmail.com>님이 작성: > > > >> Cody, > >> Thanks for join the conversation. > >> > >> If my understanding is right, the way JStorm handles complete latency is > >> same to what Apache Storm currently does. > >> Please refer > >> > https://github.com/apache/storm/blob/master/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L545 > >> > >> What I really want to address is when/which component will decide > >> complete timestamp. > >> > >> If my understanding is right, JStorm separates the threads in Spout > which > >> one is responsible for outgoing tuples, and other one is responsible for > >> receiving / handling incoming tuple. With this circumstance there's no > >> issue to keep it as is, since normally ack() / fail() are not > implemented > >> as long blocking method. > >> > >> But many users are implementing nextTuple() to sleep long amount of time > >> to throttle theirselves (yes, I was one of them) and that decision makes > >> tuples from Acker also waiting amount of time. There're some exceptions: > >> throttling is on (via backpressure), count of pending tuples are over > max > >> spout pending. > >> > >> So I guess same issue persists on JStorm with single thread mode Spout. > >> > >> Please let me know if I'm missing something. > >> > >> Thanks! > >> Jungtaek Lim (HeartSaVioR) > >> > >> > >> > >> 2016년 4월 29일 (금) 오후 4:32, Cody Innowhere <e.neve...@gmail.com>님이 작성: > >> > >>> What we do in JStorm is to set timestamp of a tuple before a spout > sends > >>> it > >>> to downstream bolts, then in spout's ack/fail method, we get current > >>> timestamp, by subtracting the original ts, we get process latency, note > >>> this delta time includes network cost from spouts to bolts, ser/deser > >>> time, bolt process time, network cost between acker to the original > >>> spout, > >>> i.e., it's almost the time of tuple's life cycle. > >>> > >>> I'm adding this on porting executor.clj. In such a way, we don't need > to > >>> care about time sync problem. > >>> > >>> On Fri, Apr 29, 2016 at 11:18 AM, Jungtaek Lim <kabh...@gmail.com> > >>> wrote: > >>> > >>> > One way to confirm my assumption is valid, we could use > sojourn_time_ms > >>> > currently provided to queue metrics. > >>> > > >>> > We could see sojourn_time_ms in '__receive' metrics of Spout > component > >>> to > >>> > verify how long messages from Acker wait from receive queue in Spout. > >>> > > >>> > And we also could estimate "waiting time in transfer queue in Spout" > by > >>> > seeing sojourn_time_ms in '__send' metrics of Spout component, and > >>> estimate > >>> > "waiting time for ACK_INIT in receive queue in Acker" by seeing > >>> > sojourn_time_ms in '__receive' metrics of Acker component. > >>> > > >>> > Since I don't have clusters/topologies for normal use case I'm not > sure > >>> > what normally the values are, but at least, metrics from > >>> > ThroughtputVsLatency, sojourn_time_ms in '__send' of Spout is often > >>> close > >>> > to 0, and sojourn_time_ms in '__receive' of Acker is less than 2ms. > >>> > If message transfer latency of ACK_INIT message is tiny, sum of > >>> latencies > >>> > on option 2 would be also tiny, maybe less than 5ms (just an > >>> assumption). > >>> > > >>> > I really like to see those metrics values (including sojourn_time_ms > in > >>> > '__receive' of Bolts) from various live topologies which handles > >>> normal use > >>> > cases to make my assumption solid. Please share if you're logging > those > >>> > metrics. > >>> > > >>> > I'll try to go on 2) first, but still open to any ideas / opinions / > >>> > objections. > >>> > > >>> > Thanks, > >>> > Jungtaek Lim (HeartSaVioR) > >>> > > >>> > 2016년 4월 29일 (금) 오전 9:38, Jungtaek Lim <kabh...@gmail.com>님이 작성: > >>> > > >>> > > Roshan, > >>> > > > >>> > > Thanks for sharing your thought. > >>> > > About your thoughts I'm in favor of 1), that's what my sketch is > >>> trying > >>> > to > >>> > > achieve. > >>> > > > >>> > > If we agree to go on 1), IMO the options I stated are clear. Let me > >>> > > elaborate more. > >>> > > > >>> > > Root tuple has been made from "Spout" and on definition of > 'complete > >>> > > latency' tuple tree is considered as complete from "Acker". Since > >>> start > >>> > > point and end point are occurring different components, we should > >>> > > tolerate either "latency of handling ACK_INIT between Spout and > >>> Acker" > >>> > > (which changes start point to Acker) or "time variation between > >>> machine > >>> > > which Spout is running on and machine which Acker is running on". I > >>> think > >>> > > there's no way to avoid both of two, so we should just choose which > >>> is > >>> > > smaller to be easier to ignore. I agree it could feel tricky for > us. > >>> > > > >>> > > I found some answers / articles claiming there could be > >>> sub-millisecond > >>> > > precision within same LAN if machines are syncing from same ntp > >>> server, > >>> > and > >>> > > other articles claiming hundreds of millisecond precision which is > >>> not > >>> > > acceptable to tolerate. > >>> > > I guess Storm doesn't require machines to be synched with same > time, > >>> so > >>> > it > >>> > > will be new requirement to set up cluster. > >>> > > > >>> > > And latency of handling ACK_INIT between Spout and Acker is up to > >>> > hardware > >>> > > cluster configurations, but normally we place machines to same rack > >>> or > >>> > same > >>> > > switch, or at least group to same LAN which shows low latency. > >>> > > So it's up to "waiting time in transfer queue in Spout" and > "waiting > >>> time > >>> > > for ACK_INIT in receive queue in Acker". But if we don't want to > get > >>> into > >>> > > too deeply, I guess this would be fine for normal situation, since > >>> Acker > >>> > is > >>> > > lightweight and should be keep up the traffic. > >>> > > > >>> > > - Jungtaek Lim (HeartSaVioR) > >>> > > > >>> > > > >>> > > 2016년 4월 29일 (금) 오전 5:41, Roshan Naik <ros...@hortonworks.com>님이 > 작성: > >>> > > > >>> > >> IMO, avoiding the time variation on machines makes total sense. > But > >>> I > >>> > feel > >>> > >> that this is a tricky question. > >>> > >> > >>> > >> > >>> > >> Couple more thoughts: > >>> > >> > >>> > >> 1) As per > >>> > >> > >>> > >> > >>> > > >>> > http://storm.apache.org/releases/current/Guaranteeing-message-processing.ht > >>> > >> ml > >>> > >> < > >>> > > >>> > http://storm.apache.org/releases/current/Guaranteeing-message-processing.html > >>> > > > >>> > >> > >>> > >> "Storm can detect when the tree of tuples is fully processed and > >>> can ack > >>> > >> or fail the spout tuple appropriately." > >>> > >> > >>> > >> > >>> > >> That seems to indicate that when the ACKer has received all the > >>> > necessary > >>> > >> acks, then it considers the tuple fully processed. If we go by > >>> that, and > >>> > >> we define complete latency as the time taken to fully process a > >>> tuple, > >>> > >> then it is not necessary to include the time it takes for the ACK > >>> to be > >>> > >> delivered to spout. > >>> > >> > >>> > >> > >>> > >> 2) If you include the time it takes to deliver the ACK to the > spout, > >>> > then > >>> > >> we also need to wonder if we should include the time that the > spout > >>> > takes > >>> > >> to process the ACK() call. I am unclear if the spout.ack() throws > an > >>> > >> exception what that means to the idea of Œfully processed¹. Here > >>> you can > >>> > >> compute delta either immediately before OR immediately after the > >>> ACK() > >>> > is > >>> > >> invoked on the spout > >>> > >> > >>> > >> > >>> > >> The benefit of including spout¹s ACK() processing time, is that > any > >>> > >> optimizations/inefficiencies in the spout's ACK() implementation > >>> will be > >>> > >> detectable. > >>> > >> > >>> > >> I wonder if we should split this into two different metricsŠ > >>> > >> > >>> > >> - ³delivery latency² (which ends once the ACKer receives the last > >>> ACK > >>> > >> from a bolt) and > >>> > >> - "complete latency² which includes ACK processing time by spout > >>> > >> > >>> > >> > >>> > >> -roshan > >>> > >> > >>> > >> > >>> > >> > >>> > >> On 4/28/16, 8:59 AM, "Jungtaek Lim" <kabh...@gmail.com> wrote: > >>> > >> > >>> > >> >Hi devs, > >>> > >> > > >>> > >> >While thinking about metrics improvements, I doubt how many users > >>> know > >>> > >> >that > >>> > >> >what 'exactly' is complete latency. In fact, it's somewhat > >>> complicated > >>> > >> >because additional waiting time could be added to complete > latency > >>> > >> because > >>> > >> >of single-thread model event loop of spout. > >>> > >> > > >>> > >> >Long running nextTuple() / ack() / fail() can affect complete > >>> latency > >>> > but > >>> > >> >it's behind the scene. No latency information provided, and > someone > >>> > even > >>> > >> >didn't know about this characteristic. Moreover, calling > >>> nextTuple() > >>> > >> could > >>> > >> >be skipped due to max spout waiting, which will make us harder to > >>> guess > >>> > >> >when avg. latency of nextTuple() will be provided. > >>> > >> > > >>> > >> >I think separation of threads (tuple handler to separate thread, > as > >>> > >> JStorm > >>> > >> >provides) would resolve the gap, but it requires our spout logic > >>> to be > >>> > >> >thread-safe, so I'd like to find workaround first. > >>> > >> > > >>> > >> >My sketched idea is let Acker decides end time for root tuple. > >>> > >> >There're two subsequent ways to decide start time for root tuple, > >>> > >> > > >>> > >> >1. when Spout about to emit ACK_INIT to Acker (in other words, > >>> keep it > >>> > as > >>> > >> >it is) > >>> > >> > - Acker sends ack / fail message to Spout with timestamp, and > >>> Spout > >>> > >> >calculates time delta > >>> > >> > - pros. : It's most accurate way since it respects the > >>> definition of > >>> > >> >'complete latency'. > >>> > >> > - cons. : The sync of machine time between machines are very > >>> > important. > >>> > >> >Milliseconds of precision would be required. > >>> > >> >2. when Acker receives ACK_INIT from Spout > >>> > >> > - Acker calculates time delta itself, and sends ack / fail > >>> message to > >>> > >> >Spout with time delta > >>> > >> > - pros. : No requirement to sync the time between servers so > >>> > strictly. > >>> > >> > - cons. : It doesn't contain the latency to send / receive > >>> ACK_INIT > >>> > >> >between Spout and Acker. > >>> > >> > > >>> > >> >Sure we could leave it as is if we decide it doesn't hurt much. > >>> > >> > > >>> > >> >What do you think? > >>> > >> > > >>> > >> >Thanks, > >>> > >> >Jungtaek Lim (HeartSaVioR) > >>> > >> > >>> > >> > >>> > > >>> > >> >