@Jungtaek,
Yes you're right. Since most of our use cases are multi-thread spouts, this
is not a problem for us.

As to Storm, I think we can use your second workaround for now, and
re-evaluate this when we finish porting multi-thread spout in Phase 2.

On Fri, Apr 29, 2016 at 6:10 PM, Jungtaek Lim <kabh...@gmail.com> wrote:

> FYI: Issue STORM-1742 <https://issues.apache.org/jira/browse/STORM-1742>
> and
> related pull request (WIP) #1379
> <https://github.com/apache/storm/pull/1379> are
> available.
>
> I've also done with functional test so that you can easily see what I'm
> claiming.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 2016년 4월 29일 (금) 오후 4:59, Jungtaek Lim <kabh...@gmail.com>님이 작성:
>
> > sorry some correction which may confuse someone:
> >
> > With this circumstance there's no issue to keep it as is, since *users
> > normally don't implemented ack() / fail() as long blocking method*.
> >
> > 2016년 4월 29일 (금) 오후 4:56, Jungtaek Lim <kabh...@gmail.com>님이 작성:
> >
> >> Cody,
> >> Thanks for join the conversation.
> >>
> >> If my understanding is right, the way JStorm handles complete latency is
> >> same to what Apache Storm currently does.
> >> Please refer
> >>
> https://github.com/apache/storm/blob/master/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L545
> >>
> >> What I really want to address is when/which component will decide
> >> complete timestamp.
> >>
> >> If my understanding is right, JStorm separates the threads in Spout
> which
> >> one is responsible for outgoing tuples, and other one is responsible for
> >> receiving / handling incoming tuple. With this circumstance there's no
> >> issue to keep it as is, since normally ack() / fail() are not
> implemented
> >> as long blocking method.
> >>
> >> But many users are implementing nextTuple() to sleep long amount of time
> >> to throttle theirselves (yes, I was one of them) and that decision makes
> >> tuples from Acker also waiting amount of time. There're some exceptions:
> >> throttling is on (via backpressure), count of pending tuples are over
> max
> >> spout pending.
> >>
> >> So I guess same issue persists on JStorm with single thread mode Spout.
> >>
> >> Please let me know if I'm missing something.
> >>
> >> Thanks!
> >> Jungtaek Lim (HeartSaVioR)
> >>
> >>
> >>
> >> 2016년 4월 29일 (금) 오후 4:32, Cody Innowhere <e.neve...@gmail.com>님이 작성:
> >>
> >>> What we do in JStorm is to set timestamp of a tuple before a spout
> sends
> >>> it
> >>> to downstream bolts, then in spout's ack/fail method, we get current
> >>> timestamp, by subtracting the original ts, we get process latency, note
> >>> this delta time includes network cost from spouts to bolts, ser/deser
> >>> time, bolt process time, network cost between acker to the original
> >>> spout,
> >>> i.e., it's almost the time of tuple's life cycle.
> >>>
> >>> I'm adding this on porting executor.clj. In such a way, we don't need
> to
> >>> care about time sync problem.
> >>>
> >>> On Fri, Apr 29, 2016 at 11:18 AM, Jungtaek Lim <kabh...@gmail.com>
> >>> wrote:
> >>>
> >>> > One way to confirm my assumption is valid, we could use
> sojourn_time_ms
> >>> > currently provided to queue metrics.
> >>> >
> >>> > We could see sojourn_time_ms in '__receive' metrics of Spout
> component
> >>> to
> >>> > verify how long messages from Acker wait from receive queue in Spout.
> >>> >
> >>> > And we also could estimate "waiting time in transfer queue in Spout"
> by
> >>> > seeing sojourn_time_ms in '__send' metrics of Spout component, and
> >>> estimate
> >>> > "waiting time for ACK_INIT in receive queue in Acker" by seeing
> >>> > sojourn_time_ms in '__receive' metrics of Acker component.
> >>> >
> >>> > Since I don't have clusters/topologies for normal use case I'm not
> sure
> >>> > what normally the values are, but at least, metrics from
> >>> > ThroughtputVsLatency, sojourn_time_ms in '__send' of Spout is often
> >>> close
> >>> > to 0, and sojourn_time_ms in '__receive' of Acker is less than 2ms.
> >>> > If message transfer latency of ACK_INIT message is tiny, sum of
> >>> latencies
> >>> > on option 2 would be also tiny, maybe less than 5ms (just an
> >>> assumption).
> >>> >
> >>> > I really like to see those metrics values (including sojourn_time_ms
> in
> >>> > '__receive' of Bolts) from various live topologies which handles
> >>> normal use
> >>> > cases to make my assumption solid. Please share if you're logging
> those
> >>> > metrics.
> >>> >
> >>> > I'll try to go on 2) first, but still open to any ideas / opinions /
> >>> > objections.
> >>> >
> >>> > Thanks,
> >>> > Jungtaek Lim (HeartSaVioR)
> >>> >
> >>> > 2016년 4월 29일 (금) 오전 9:38, Jungtaek Lim <kabh...@gmail.com>님이 작성:
> >>> >
> >>> > > Roshan,
> >>> > >
> >>> > > Thanks for sharing your thought.
> >>> > > About your thoughts I'm in favor of 1), that's what my sketch is
> >>> trying
> >>> > to
> >>> > > achieve.
> >>> > >
> >>> > > If we agree to go on 1), IMO the options I stated are clear. Let me
> >>> > > elaborate more.
> >>> > >
> >>> > > Root tuple has been made from "Spout" and on definition of
> 'complete
> >>> > > latency' tuple tree is considered as complete from "Acker". Since
> >>> start
> >>> > > point and end point are occurring different components, we should
> >>> > > tolerate either "latency of handling ACK_INIT between Spout and
> >>> Acker"
> >>> > > (which changes start point to Acker) or "time variation between
> >>> machine
> >>> > > which Spout is running on and machine which Acker is running on". I
> >>> think
> >>> > > there's no way to avoid both of two, so we should just choose which
> >>> is
> >>> > > smaller to be easier to ignore. I agree it could feel tricky for
> us.
> >>> > >
> >>> > > I found some answers / articles claiming there could be
> >>> sub-millisecond
> >>> > > precision within same LAN if machines are syncing from same ntp
> >>> server,
> >>> > and
> >>> > > other articles claiming hundreds of millisecond precision which is
> >>> not
> >>> > > acceptable to tolerate.
> >>> > > I guess Storm doesn't require machines to be synched with same
> time,
> >>> so
> >>> > it
> >>> > > will be new requirement to set up cluster.
> >>> > >
> >>> > > And latency of handling ACK_INIT between Spout and Acker is up to
> >>> > hardware
> >>> > > cluster configurations, but normally we place machines to same rack
> >>> or
> >>> > same
> >>> > > switch, or at least group to same LAN which shows low latency.
> >>> > > So it's up to "waiting time in transfer queue in Spout" and
> "waiting
> >>> time
> >>> > > for ACK_INIT in receive queue in Acker". But if we don't want to
> get
> >>> into
> >>> > > too deeply, I guess this would be fine for normal situation, since
> >>> Acker
> >>> > is
> >>> > > lightweight and should be keep up the traffic.
> >>> > >
> >>> > > - Jungtaek Lim (HeartSaVioR)
> >>> > >
> >>> > >
> >>> > > 2016년 4월 29일 (금) 오전 5:41, Roshan Naik <ros...@hortonworks.com>님이
> 작성:
> >>> > >
> >>> > >> IMO, avoiding the time variation on machines makes total sense.
> But
> >>> I
> >>> > feel
> >>> > >> that this is a tricky question.
> >>> > >>
> >>> > >>
> >>> > >> Couple more thoughts:
> >>> > >>
> >>> > >> 1)  As per
> >>> > >>
> >>> > >>
> >>> >
> >>>
> http://storm.apache.org/releases/current/Guaranteeing-message-processing.ht
> >>> > >> ml
> >>> > >> <
> >>> >
> >>>
> http://storm.apache.org/releases/current/Guaranteeing-message-processing.html
> >>> > >
> >>> > >>
> >>> > >> "Storm can detect when the tree of tuples is fully processed and
> >>> can ack
> >>> > >> or fail the spout tuple appropriately."
> >>> > >>
> >>> > >>
> >>> > >> That seems to indicate that when the ACKer has received all the
> >>> > necessary
> >>> > >> acks, then it considers the tuple fully processed. If we go by
> >>> that, and
> >>> > >> we define complete latency as the time taken to fully process a
> >>> tuple,
> >>> > >> then it is not necessary to include the time it takes for the ACK
> >>> to be
> >>> > >> delivered to spout.
> >>> > >>
> >>> > >>
> >>> > >> 2) If you include the time it takes to deliver the ACK to the
> spout,
> >>> > then
> >>> > >> we also need to wonder if we should include the time that the
> spout
> >>> > takes
> >>> > >> to process the ACK() call. I am unclear if the spout.ack() throws
> an
> >>> > >> exception what that means to the idea of Œfully processed¹. Here
> >>> you can
> >>> > >> compute delta either immediately before OR immediately after the
> >>> ACK()
> >>> > is
> >>> > >> invoked on the spout
> >>> > >>
> >>> > >>
> >>> > >> The benefit of including spout¹s ACK() processing time, is that
> any
> >>> > >> optimizations/inefficiencies in the spout's ACK() implementation
> >>> will be
> >>> > >> detectable.
> >>> > >>
> >>> > >> I wonder if we should split this into two different metricsŠ
> >>> > >>
> >>> > >> - ³delivery latency²  (which ends once the ACKer receives the last
> >>> ACK
> >>> > >> from a bolt) and
> >>> > >> - "complete latency² which includes ACK processing time by spout
> >>> > >>
> >>> > >>
> >>> > >>  -roshan
> >>> > >>
> >>> > >>
> >>> > >>
> >>> > >> On 4/28/16, 8:59 AM, "Jungtaek Lim" <kabh...@gmail.com> wrote:
> >>> > >>
> >>> > >> >Hi devs,
> >>> > >> >
> >>> > >> >While thinking about metrics improvements, I doubt how many users
> >>> know
> >>> > >> >that
> >>> > >> >what 'exactly' is complete latency. In fact, it's somewhat
> >>> complicated
> >>> > >> >because additional waiting time could be added to complete
> latency
> >>> > >> because
> >>> > >> >of single-thread model event loop of spout.
> >>> > >> >
> >>> > >> >Long running nextTuple() / ack() / fail() can affect complete
> >>> latency
> >>> > but
> >>> > >> >it's behind the scene. No latency information provided, and
> someone
> >>> > even
> >>> > >> >didn't know about this characteristic. Moreover, calling
> >>> nextTuple()
> >>> > >> could
> >>> > >> >be skipped due to max spout waiting, which will make us harder to
> >>> guess
> >>> > >> >when avg. latency of nextTuple() will be provided.
> >>> > >> >
> >>> > >> >I think separation of threads (tuple handler to separate thread,
> as
> >>> > >> JStorm
> >>> > >> >provides) would resolve the gap, but it requires our spout logic
> >>> to be
> >>> > >> >thread-safe, so I'd like to find workaround first.
> >>> > >> >
> >>> > >> >My sketched idea is let Acker decides end time for root tuple.
> >>> > >> >There're two subsequent ways to decide start time for root tuple,
> >>> > >> >
> >>> > >> >1. when Spout about to emit ACK_INIT to Acker (in other words,
> >>> keep it
> >>> > as
> >>> > >> >it is)
> >>> > >> >  - Acker sends ack / fail message to Spout with timestamp, and
> >>> Spout
> >>> > >> >calculates time delta
> >>> > >> >  - pros. : It's most accurate way since it respects the
> >>> definition of
> >>> > >> >'complete latency'.
> >>> > >> >  - cons. : The sync of machine time between machines are very
> >>> > important.
> >>> > >> >Milliseconds of precision would be required.
> >>> > >> >2. when Acker receives ACK_INIT from Spout
> >>> > >> >  - Acker calculates time delta itself, and sends ack / fail
> >>> message to
> >>> > >> >Spout with time delta
> >>> > >> >  - pros. : No requirement to sync the time between servers so
> >>> > strictly.
> >>> > >> >  - cons. : It doesn't contain the latency to send / receive
> >>> ACK_INIT
> >>> > >> >between Spout and Acker.
> >>> > >> >
> >>> > >> >Sure we could leave it as is if we decide it doesn't hurt much.
> >>> > >> >
> >>> > >> >What do you think?
> >>> > >> >
> >>> > >> >Thanks,
> >>> > >> >Jungtaek Lim (HeartSaVioR)
> >>> > >>
> >>> > >>
> >>> >
> >>>
> >>
>

Reply via email to