sorry some correction which may confuse someone: With this circumstance there's no issue to keep it as is, since *users normally don't implemented ack() / fail() as long blocking method*.
2016년 4월 29일 (금) 오후 4:56, Jungtaek Lim <kabh...@gmail.com>님이 작성: > Cody, > Thanks for join the conversation. > > If my understanding is right, the way JStorm handles complete latency is > same to what Apache Storm currently does. > Please refer > https://github.com/apache/storm/blob/master/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L545 > > What I really want to address is when/which component will decide complete > timestamp. > > If my understanding is right, JStorm separates the threads in Spout which > one is responsible for outgoing tuples, and other one is responsible for > receiving / handling incoming tuple. With this circumstance there's no > issue to keep it as is, since normally ack() / fail() are not implemented > as long blocking method. > > But many users are implementing nextTuple() to sleep long amount of time > to throttle theirselves (yes, I was one of them) and that decision makes > tuples from Acker also waiting amount of time. There're some exceptions: > throttling is on (via backpressure), count of pending tuples are over max > spout pending. > > So I guess same issue persists on JStorm with single thread mode Spout. > > Please let me know if I'm missing something. > > Thanks! > Jungtaek Lim (HeartSaVioR) > > > > 2016년 4월 29일 (금) 오후 4:32, Cody Innowhere <e.neve...@gmail.com>님이 작성: > >> What we do in JStorm is to set timestamp of a tuple before a spout sends >> it >> to downstream bolts, then in spout's ack/fail method, we get current >> timestamp, by subtracting the original ts, we get process latency, note >> this delta time includes network cost from spouts to bolts, ser/deser >> time, bolt process time, network cost between acker to the original spout, >> i.e., it's almost the time of tuple's life cycle. >> >> I'm adding this on porting executor.clj. In such a way, we don't need to >> care about time sync problem. >> >> On Fri, Apr 29, 2016 at 11:18 AM, Jungtaek Lim <kabh...@gmail.com> wrote: >> >> > One way to confirm my assumption is valid, we could use sojourn_time_ms >> > currently provided to queue metrics. >> > >> > We could see sojourn_time_ms in '__receive' metrics of Spout component >> to >> > verify how long messages from Acker wait from receive queue in Spout. >> > >> > And we also could estimate "waiting time in transfer queue in Spout" by >> > seeing sojourn_time_ms in '__send' metrics of Spout component, and >> estimate >> > "waiting time for ACK_INIT in receive queue in Acker" by seeing >> > sojourn_time_ms in '__receive' metrics of Acker component. >> > >> > Since I don't have clusters/topologies for normal use case I'm not sure >> > what normally the values are, but at least, metrics from >> > ThroughtputVsLatency, sojourn_time_ms in '__send' of Spout is often >> close >> > to 0, and sojourn_time_ms in '__receive' of Acker is less than 2ms. >> > If message transfer latency of ACK_INIT message is tiny, sum of >> latencies >> > on option 2 would be also tiny, maybe less than 5ms (just an >> assumption). >> > >> > I really like to see those metrics values (including sojourn_time_ms in >> > '__receive' of Bolts) from various live topologies which handles normal >> use >> > cases to make my assumption solid. Please share if you're logging those >> > metrics. >> > >> > I'll try to go on 2) first, but still open to any ideas / opinions / >> > objections. >> > >> > Thanks, >> > Jungtaek Lim (HeartSaVioR) >> > >> > 2016년 4월 29일 (금) 오전 9:38, Jungtaek Lim <kabh...@gmail.com>님이 작성: >> > >> > > Roshan, >> > > >> > > Thanks for sharing your thought. >> > > About your thoughts I'm in favor of 1), that's what my sketch is >> trying >> > to >> > > achieve. >> > > >> > > If we agree to go on 1), IMO the options I stated are clear. Let me >> > > elaborate more. >> > > >> > > Root tuple has been made from "Spout" and on definition of 'complete >> > > latency' tuple tree is considered as complete from "Acker". Since >> start >> > > point and end point are occurring different components, we should >> > > tolerate either "latency of handling ACK_INIT between Spout and Acker" >> > > (which changes start point to Acker) or "time variation between >> machine >> > > which Spout is running on and machine which Acker is running on". I >> think >> > > there's no way to avoid both of two, so we should just choose which is >> > > smaller to be easier to ignore. I agree it could feel tricky for us. >> > > >> > > I found some answers / articles claiming there could be >> sub-millisecond >> > > precision within same LAN if machines are syncing from same ntp >> server, >> > and >> > > other articles claiming hundreds of millisecond precision which is not >> > > acceptable to tolerate. >> > > I guess Storm doesn't require machines to be synched with same time, >> so >> > it >> > > will be new requirement to set up cluster. >> > > >> > > And latency of handling ACK_INIT between Spout and Acker is up to >> > hardware >> > > cluster configurations, but normally we place machines to same rack or >> > same >> > > switch, or at least group to same LAN which shows low latency. >> > > So it's up to "waiting time in transfer queue in Spout" and "waiting >> time >> > > for ACK_INIT in receive queue in Acker". But if we don't want to get >> into >> > > too deeply, I guess this would be fine for normal situation, since >> Acker >> > is >> > > lightweight and should be keep up the traffic. >> > > >> > > - Jungtaek Lim (HeartSaVioR) >> > > >> > > >> > > 2016년 4월 29일 (금) 오전 5:41, Roshan Naik <ros...@hortonworks.com>님이 작성: >> > > >> > >> IMO, avoiding the time variation on machines makes total sense. But I >> > feel >> > >> that this is a tricky question. >> > >> >> > >> >> > >> Couple more thoughts: >> > >> >> > >> 1) As per >> > >> >> > >> >> > >> http://storm.apache.org/releases/current/Guaranteeing-message-processing.ht >> > >> ml >> > >> < >> > >> http://storm.apache.org/releases/current/Guaranteeing-message-processing.html >> > > >> > >> >> > >> "Storm can detect when the tree of tuples is fully processed and can >> ack >> > >> or fail the spout tuple appropriately." >> > >> >> > >> >> > >> That seems to indicate that when the ACKer has received all the >> > necessary >> > >> acks, then it considers the tuple fully processed. If we go by that, >> and >> > >> we define complete latency as the time taken to fully process a >> tuple, >> > >> then it is not necessary to include the time it takes for the ACK to >> be >> > >> delivered to spout. >> > >> >> > >> >> > >> 2) If you include the time it takes to deliver the ACK to the spout, >> > then >> > >> we also need to wonder if we should include the time that the spout >> > takes >> > >> to process the ACK() call. I am unclear if the spout.ack() throws an >> > >> exception what that means to the idea of Œfully processed¹. Here you >> can >> > >> compute delta either immediately before OR immediately after the >> ACK() >> > is >> > >> invoked on the spout >> > >> >> > >> >> > >> The benefit of including spout¹s ACK() processing time, is that any >> > >> optimizations/inefficiencies in the spout's ACK() implementation >> will be >> > >> detectable. >> > >> >> > >> I wonder if we should split this into two different metricsŠ >> > >> >> > >> - ³delivery latency² (which ends once the ACKer receives the last >> ACK >> > >> from a bolt) and >> > >> - "complete latency² which includes ACK processing time by spout >> > >> >> > >> >> > >> -roshan >> > >> >> > >> >> > >> >> > >> On 4/28/16, 8:59 AM, "Jungtaek Lim" <kabh...@gmail.com> wrote: >> > >> >> > >> >Hi devs, >> > >> > >> > >> >While thinking about metrics improvements, I doubt how many users >> know >> > >> >that >> > >> >what 'exactly' is complete latency. In fact, it's somewhat >> complicated >> > >> >because additional waiting time could be added to complete latency >> > >> because >> > >> >of single-thread model event loop of spout. >> > >> > >> > >> >Long running nextTuple() / ack() / fail() can affect complete >> latency >> > but >> > >> >it's behind the scene. No latency information provided, and someone >> > even >> > >> >didn't know about this characteristic. Moreover, calling nextTuple() >> > >> could >> > >> >be skipped due to max spout waiting, which will make us harder to >> guess >> > >> >when avg. latency of nextTuple() will be provided. >> > >> > >> > >> >I think separation of threads (tuple handler to separate thread, as >> > >> JStorm >> > >> >provides) would resolve the gap, but it requires our spout logic to >> be >> > >> >thread-safe, so I'd like to find workaround first. >> > >> > >> > >> >My sketched idea is let Acker decides end time for root tuple. >> > >> >There're two subsequent ways to decide start time for root tuple, >> > >> > >> > >> >1. when Spout about to emit ACK_INIT to Acker (in other words, keep >> it >> > as >> > >> >it is) >> > >> > - Acker sends ack / fail message to Spout with timestamp, and >> Spout >> > >> >calculates time delta >> > >> > - pros. : It's most accurate way since it respects the definition >> of >> > >> >'complete latency'. >> > >> > - cons. : The sync of machine time between machines are very >> > important. >> > >> >Milliseconds of precision would be required. >> > >> >2. when Acker receives ACK_INIT from Spout >> > >> > - Acker calculates time delta itself, and sends ack / fail >> message to >> > >> >Spout with time delta >> > >> > - pros. : No requirement to sync the time between servers so >> > strictly. >> > >> > - cons. : It doesn't contain the latency to send / receive >> ACK_INIT >> > >> >between Spout and Acker. >> > >> > >> > >> >Sure we could leave it as is if we decide it doesn't hurt much. >> > >> > >> > >> >What do you think? >> > >> > >> > >> >Thanks, >> > >> >Jungtaek Lim (HeartSaVioR) >> > >> >> > >> >> > >> >