Cody,
Thanks for join the conversation.

If my understanding is right, the way JStorm handles complete latency is
same to what Apache Storm currently does.
Please refer
https://github.com/apache/storm/blob/master/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L545

What I really want to address is when/which component will decide complete
timestamp.

If my understanding is right, JStorm separates the threads in Spout which
one is responsible for outgoing tuples, and other one is responsible for
receiving / handling incoming tuple. With this circumstance there's no
issue to keep it as is, since normally ack() / fail() are not implemented
as long blocking method.

But many users are implementing nextTuple() to sleep long amount of time to
throttle theirselves (yes, I was one of them) and that decision makes
tuples from Acker also waiting amount of time. There're some exceptions:
throttling is on (via backpressure), count of pending tuples are over max
spout pending.

So I guess same issue persists on JStorm with single thread mode Spout.

Please let me know if I'm missing something.

Thanks!
Jungtaek Lim (HeartSaVioR)



2016년 4월 29일 (금) 오후 4:32, Cody Innowhere <e.neve...@gmail.com>님이 작성:

> What we do in JStorm is to set timestamp of a tuple before a spout sends it
> to downstream bolts, then in spout's ack/fail method, we get current
> timestamp, by subtracting the original ts, we get process latency, note
> this delta time includes network cost from spouts to bolts, ser/deser
> time, bolt process time, network cost between acker to the original spout,
> i.e., it's almost the time of tuple's life cycle.
>
> I'm adding this on porting executor.clj. In such a way, we don't need to
> care about time sync problem.
>
> On Fri, Apr 29, 2016 at 11:18 AM, Jungtaek Lim <kabh...@gmail.com> wrote:
>
> > One way to confirm my assumption is valid, we could use sojourn_time_ms
> > currently provided to queue metrics.
> >
> > We could see sojourn_time_ms in '__receive' metrics of Spout component to
> > verify how long messages from Acker wait from receive queue in Spout.
> >
> > And we also could estimate "waiting time in transfer queue in Spout" by
> > seeing sojourn_time_ms in '__send' metrics of Spout component, and
> estimate
> > "waiting time for ACK_INIT in receive queue in Acker" by seeing
> > sojourn_time_ms in '__receive' metrics of Acker component.
> >
> > Since I don't have clusters/topologies for normal use case I'm not sure
> > what normally the values are, but at least, metrics from
> > ThroughtputVsLatency, sojourn_time_ms in '__send' of Spout is often close
> > to 0, and sojourn_time_ms in '__receive' of Acker is less than 2ms.
> > If message transfer latency of ACK_INIT message is tiny, sum of latencies
> > on option 2 would be also tiny, maybe less than 5ms (just an assumption).
> >
> > I really like to see those metrics values (including sojourn_time_ms in
> > '__receive' of Bolts) from various live topologies which handles normal
> use
> > cases to make my assumption solid. Please share if you're logging those
> > metrics.
> >
> > I'll try to go on 2) first, but still open to any ideas / opinions /
> > objections.
> >
> > Thanks,
> > Jungtaek Lim (HeartSaVioR)
> >
> > 2016년 4월 29일 (금) 오전 9:38, Jungtaek Lim <kabh...@gmail.com>님이 작성:
> >
> > > Roshan,
> > >
> > > Thanks for sharing your thought.
> > > About your thoughts I'm in favor of 1), that's what my sketch is trying
> > to
> > > achieve.
> > >
> > > If we agree to go on 1), IMO the options I stated are clear. Let me
> > > elaborate more.
> > >
> > > Root tuple has been made from "Spout" and on definition of 'complete
> > > latency' tuple tree is considered as complete from "Acker". Since start
> > > point and end point are occurring different components, we should
> > > tolerate either "latency of handling ACK_INIT between Spout and Acker"
> > > (which changes start point to Acker) or "time variation between machine
> > > which Spout is running on and machine which Acker is running on". I
> think
> > > there's no way to avoid both of two, so we should just choose which is
> > > smaller to be easier to ignore. I agree it could feel tricky for us.
> > >
> > > I found some answers / articles claiming there could be sub-millisecond
> > > precision within same LAN if machines are syncing from same ntp server,
> > and
> > > other articles claiming hundreds of millisecond precision which is not
> > > acceptable to tolerate.
> > > I guess Storm doesn't require machines to be synched with same time, so
> > it
> > > will be new requirement to set up cluster.
> > >
> > > And latency of handling ACK_INIT between Spout and Acker is up to
> > hardware
> > > cluster configurations, but normally we place machines to same rack or
> > same
> > > switch, or at least group to same LAN which shows low latency.
> > > So it's up to "waiting time in transfer queue in Spout" and "waiting
> time
> > > for ACK_INIT in receive queue in Acker". But if we don't want to get
> into
> > > too deeply, I guess this would be fine for normal situation, since
> Acker
> > is
> > > lightweight and should be keep up the traffic.
> > >
> > > - Jungtaek Lim (HeartSaVioR)
> > >
> > >
> > > 2016년 4월 29일 (금) 오전 5:41, Roshan Naik <ros...@hortonworks.com>님이 작성:
> > >
> > >> IMO, avoiding the time variation on machines makes total sense. But I
> > feel
> > >> that this is a tricky question.
> > >>
> > >>
> > >> Couple more thoughts:
> > >>
> > >> 1)  As per
> > >>
> > >>
> >
> http://storm.apache.org/releases/current/Guaranteeing-message-processing.ht
> > >> ml
> > >> <
> >
> http://storm.apache.org/releases/current/Guaranteeing-message-processing.html
> > >
> > >>
> > >> "Storm can detect when the tree of tuples is fully processed and can
> ack
> > >> or fail the spout tuple appropriately."
> > >>
> > >>
> > >> That seems to indicate that when the ACKer has received all the
> > necessary
> > >> acks, then it considers the tuple fully processed. If we go by that,
> and
> > >> we define complete latency as the time taken to fully process a tuple,
> > >> then it is not necessary to include the time it takes for the ACK to
> be
> > >> delivered to spout.
> > >>
> > >>
> > >> 2) If you include the time it takes to deliver the ACK to the spout,
> > then
> > >> we also need to wonder if we should include the time that the spout
> > takes
> > >> to process the ACK() call. I am unclear if the spout.ack() throws an
> > >> exception what that means to the idea of Œfully processed¹. Here you
> can
> > >> compute delta either immediately before OR immediately after the ACK()
> > is
> > >> invoked on the spout
> > >>
> > >>
> > >> The benefit of including spout¹s ACK() processing time, is that any
> > >> optimizations/inefficiencies in the spout's ACK() implementation will
> be
> > >> detectable.
> > >>
> > >> I wonder if we should split this into two different metricsŠ
> > >>
> > >> - ³delivery latency²  (which ends once the ACKer receives the last ACK
> > >> from a bolt) and
> > >> - "complete latency² which includes ACK processing time by spout
> > >>
> > >>
> > >>  -roshan
> > >>
> > >>
> > >>
> > >> On 4/28/16, 8:59 AM, "Jungtaek Lim" <kabh...@gmail.com> wrote:
> > >>
> > >> >Hi devs,
> > >> >
> > >> >While thinking about metrics improvements, I doubt how many users
> know
> > >> >that
> > >> >what 'exactly' is complete latency. In fact, it's somewhat
> complicated
> > >> >because additional waiting time could be added to complete latency
> > >> because
> > >> >of single-thread model event loop of spout.
> > >> >
> > >> >Long running nextTuple() / ack() / fail() can affect complete latency
> > but
> > >> >it's behind the scene. No latency information provided, and someone
> > even
> > >> >didn't know about this characteristic. Moreover, calling nextTuple()
> > >> could
> > >> >be skipped due to max spout waiting, which will make us harder to
> guess
> > >> >when avg. latency of nextTuple() will be provided.
> > >> >
> > >> >I think separation of threads (tuple handler to separate thread, as
> > >> JStorm
> > >> >provides) would resolve the gap, but it requires our spout logic to
> be
> > >> >thread-safe, so I'd like to find workaround first.
> > >> >
> > >> >My sketched idea is let Acker decides end time for root tuple.
> > >> >There're two subsequent ways to decide start time for root tuple,
> > >> >
> > >> >1. when Spout about to emit ACK_INIT to Acker (in other words, keep
> it
> > as
> > >> >it is)
> > >> >  - Acker sends ack / fail message to Spout with timestamp, and Spout
> > >> >calculates time delta
> > >> >  - pros. : It's most accurate way since it respects the definition
> of
> > >> >'complete latency'.
> > >> >  - cons. : The sync of machine time between machines are very
> > important.
> > >> >Milliseconds of precision would be required.
> > >> >2. when Acker receives ACK_INIT from Spout
> > >> >  - Acker calculates time delta itself, and sends ack / fail message
> to
> > >> >Spout with time delta
> > >> >  - pros. : No requirement to sync the time between servers so
> > strictly.
> > >> >  - cons. : It doesn't contain the latency to send / receive ACK_INIT
> > >> >between Spout and Acker.
> > >> >
> > >> >Sure we could leave it as is if we decide it doesn't hurt much.
> > >> >
> > >> >What do you think?
> > >> >
> > >> >Thanks,
> > >> >Jungtaek Lim (HeartSaVioR)
> > >>
> > >>
> >
>

Reply via email to