Yes he is :)

Thks
Amol

On Thu, Feb 11, 2016 at 7:28 PM, Timothy Farkas <[email protected]> wrote:

> Oh thanks, now I see. For some reason I thought the streamCodec is used to
> get the partitionId when data is taken out of buffer server, but the
> partitionId is determined when data is put into buffer server. In which
> case Pramod is right :).
>
> Tim
>
> On Thu, Feb 11, 2016 at 7:17 PM, Amol Kekre <[email protected]> wrote:
>
> > Tim,
> > Data for window 20-40 is already in bufferserver. It is post partition
> > decision?
> >
> > Thks
> > Amol
> >
> > On Thu, Feb 11, 2016 at 7:14 PM, Timothy Farkas <[email protected]>
> > wrote:
> >
> > > Hey Pramod,
> > >
> > > I thought APEX-339 would cause everyone to get restarted to a common
> > > checkpoint if P1 failed. But I think I misunderstood.
> > > Based on what you just said, how do you guarantee that P1 will receive
> > the
> > > same data as before? Will Stream Codecs only apply to windows higher
> > than a
> > > certain id?
> > >
> > > Building off of Gaurav's example. Let's say we have the following
> > > situation:
> > >
> > > 1. we are using Stream Codec A,
> > > 2. then on Window 30 we start using Stream Code B.
> > > 3. Window 40 P1 fails and comes back to window 20.
> > > 4. P2 never failed and continues running this whole time.
> > >
> > > Will Stream Codec A still apply to the data P1 gets for window 20 - 30
> > and
> > > Stream Codec B will be used for windows 30 and above? If it is not,
> some
> > > data could be
> > > lost or duplicated.
> > >
> > > Thanks,
> > > Tim
> > >
> > > On Thu, Feb 11, 2016 at 6:58 PM, Pramod Immaneni <
> [email protected]
> > >
> > > wrote:
> > >
> > > > So P1 and P2 have an upstream operator sending data, when P1 restarts
> > it
> > > > will receive same data as before already present in buffer server. If
> > the
> > > > upstream operator has to restart all downstream will be reset to same
> > > > checkpoint.
> > > >
> > > > On Thu, Feb 11, 2016 at 6:03 PM, Gaurav Gupta <
> > [email protected]>
> > > > wrote:
> > > >
> > > > > How will it work in following scenario
> > > > >
> > > > > Say Operator O has two partitions P1 and P2. P1 was processing
> faster
> > > > than
> > > > > P2 and streamcodec decided to send more data to P1. P1 process some
> > > > windows
> > > > > and then P1 crashes and it came back to previous checkpoint. Now P1
> > > comes
> > > > > on a node which is slow and it processes slowly. So the streamcodec
> > > > decides
> > > > > to send less data to P1. In this case will application not loose
> data
> > > > some
> > > > > windows?
> > > > >
> > > > > Similarly in a reverse scenario, I think there will be duplicate of
> > > data.
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Feb 11, 2016 at 5:46 PM, Pramod Immaneni <
> > > [email protected]
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Inline
> > > > > >
> > > > > > On Thu, Feb 11, 2016 at 4:32 PM, Timothy Farkas <
> > [email protected]
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Comments inline
> > > > > > >
> > > > > > > +1 Overall as well provided Apex-339 is implemented first and
> it
> > is
> > > > > > > documented that the mechanism should not be used with some
> > stateful
> > > > > > > operators.
> > > > > > >
> > > > > > > On Thu, Feb 11, 2016 at 4:20 PM, Pramod Immaneni <
> > > > > [email protected]
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Comments inline
> > > > > > > >
> > > > > > > > On Thu, Feb 11, 2016 at 4:13 PM, Timothy Farkas <
> > > > [email protected]
> > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hey Pramod,
> > > > > > > > >
> > > > > > > > > I agree if APEX-339 is in place then it would work without
> > > > > > redeploying
> > > > > > > > > containers for operators that are Stateless, or a subset of
> > > > > Stateful
> > > > > > > > > operators.
> > > > > > > > >
> > > > > > > > > Addressing your previous questions.
> > > > > > > > >
> > > > > > > > > - The StatsListener can be used to see how far behind
> > operators
> > > > > are.
> > > > > > > You
> > > > > > > > > could determine what window the operator is on, or the
> number
> > > of
> > > > > > tuples
> > > > > > > > > it's processed so far, or how long
> > > > > > > > > it takes it to complete a window.
> > > > > > > > >
> > > > > > > >
> > > > > > > > What if tuples are different sizes and number of tuples
> > processed
> > > > > > doesn't
> > > > > > > > reflect how far ahead or behind a downstream partition is?
> How
> > is
> > > > the
> > > > > > > > information from StatsListener made available to the upstream
> > > > > partition
> > > > > > > > codecs.
> > > > > > > >
> > > > > > > What is the information Buffer Server can provide that the
> > > > > StatsListener
> > > > > > > cannot?
> > > > > > >
> > > > > >
> > > > > > The stats information would have to be relayed down to the
> upstream
> > > > > > operators. It's possible.
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > The StatsListener can trigger a repartition. The information in
> > the
> > > > > > > StatsListener can be shared
> > > > > > > with the partitioner by setting the same object for both in
> > > populate
> > > > > Dag.
> > > > > > > The partitioner can then
> > > > > > > compute the new Stream Codec. The mechanism by which the
> upstream
> > > > would
> > > > > > be
> > > > > > > updated with the new
> > > > > > > Stream Codec would have to be implemented as it's currently not
> > > > there.
> > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > >
> > > > > > > > > - Some examples of Stateful operators that require
> > > repartitioning
> > > > > of
> > > > > > > > state
> > > > > > > > > are the following:
> > > > > > > > >       - Deduper
> > > > > > > > >            In this case after updating the stream codec the
> > > > > operator
> > > > > > > may
> > > > > > > > > allow a previously seen value to pass because the partition
> > > > didn't
> > > > > > > > receive
> > > > > > > > > that value with the previous stream codec.
> > > > > > > > >       - A key value store that holds aggregations for each
> > key.
> > > > > > > > >            In this case multiple partitions would hold
> > partial
> > > > > > > > aggregations
> > > > > > > > > for a key, when they are expecting to hold the complete
> > > > > aggregation.
> > > > > > > > >
> > > > > > > >
> > > > > > > > Agreed for deduper. For the second case a unifier is a better
> > > > > approach
> > > > > > so
> > > > > > > > that you are not affected by key skew in general.
> > > > > > > >
> > > > > > > This is not always possible. We can discuss this offline, since
> > it
> > > > > won't
> > > > > > > add much to the discussion here to go into the details.
> > > > > > >
> > > > > > >
> > > > > > Yes not always.
> > > > > >
> > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > >
> > > > > > > > > Tim
> > > > > > > > >
> > > > > > > > > On Thu, Feb 11, 2016 at 4:04 PM, Pramod Immaneni <
> > > > > > > [email protected]
> > > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Additionally it can be treated as a non-idempotent stream
> > for
> > > > > > > recovery.
> > > > > > > > > > Look at APEXCORE-339. In cases where the downstream
> > > partitions
> > > > > > > require
> > > > > > > > > some
> > > > > > > > > > key based partitioning, what you are suggesting would be
> a
> > > good
> > > > > > > > approach
> > > > > > > > > > but it will require more complex logic in the StreamCodec
> > to
> > > > both
> > > > > > key
> > > > > > > > and
> > > > > > > > > > load based partitioning.
> > > > > > > > > >
> > > > > > > > > > On Thu, Feb 11, 2016 at 3:49 PM, Pramod Immaneni <
> > > > > > > > [email protected]
> > > > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > How would you know how far behind partitions are
> without
> > > > > > > interacting
> > > > > > > > > with
> > > > > > > > > > > BufferServer like you were mentioning in the earlier
> > email.
> > > > > > > Secondly
> > > > > > > > > why
> > > > > > > > > > > would changing where the data is sent to based mandate
> > > > > > > > re-partitioning
> > > > > > > > > if
> > > > > > > > > > > the downstream partitions can handle data with
> different
> > > > keys.
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Feb 11, 2016 at 3:43 PM, Timothy Farkas <
> > > > > > > [email protected]
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > >> Hey Pramod,
> > > > > > > > > > >>
> > > > > > > > > > >> I think in general and for recovery the existing
> > > > Partitioning
> > > > > > > > > machinery
> > > > > > > > > > >> can
> > > > > > > > > > >> be reused to update the Stream Codec.
> > > > > > > > > > >> The reason why is because If the operator is Stateful
> > and
> > > > > > changes
> > > > > > > > are
> > > > > > > > > > made
> > > > > > > > > > >> to the Stream Codec, the state of the partitions will
> > also
> > > > > have
> > > > > > to
> > > > > > > > be
> > > > > > > > > > >> repartitioned.
> > > > > > > > > > >> In this case the number of partitions will remain the
> > > same,
> > > > > just
> > > > > > > the
> > > > > > > > > > state
> > > > > > > > > > >> of the partitions is reshuffled. The implementation
> for
> > > this
> > > > > > state
> > > > > > > > > > >> reshuffling in a fault tolerant way is already handled
> > by
> > > > the
> > > > > > > > Dynamic
> > > > > > > > > > >> Partitioning logic, so it could be extended to update
> > the
> > > > > Stream
> > > > > > > > Codec
> > > > > > > > > > as
> > > > > > > > > > >> well.
> > > > > > > > > > >>
> > > > > > > > > > >> If the operator is Stateless, it may be possible to do
> > > > without
> > > > > > > > > > redeploying
> > > > > > > > > > >> any containers. But with the way I am envisioning it,
> I
> > > > think
> > > > > > > there
> > > > > > > > > > would
> > > > > > > > > > >> be a lot of difficult to handle corner cases for
> > recovery.
> > > > > > > > > > >>
> > > > > > > > > > >> Tim
> > > > > > > > > > >>
> > > > > > > > > > >> On Thu, Feb 11, 2016 at 3:07 PM, Pramod Immaneni <
> > > > > > > > > > [email protected]>
> > > > > > > > > > >> wrote:
> > > > > > > > > > >>
> > > > > > > > > > >> > Comment inline.
> > > > > > > > > > >> >
> > > > > > > > > > >> > On Thu, Feb 11, 2016 at 12:21 PM, Timothy Farkas <
> > > > > > > > > [email protected]
> > > > > > > > > > >
> > > > > > > > > > >> > wrote:
> > > > > > > > > > >> >
> > > > > > > > > > >> > > +1 for the idea.
> > > > > > > > > > >> > >
> > > > > > > > > > >> > > Gaurav, this could be done idempotently in the
> same
> > > way
> > > > > that
> > > > > > > > > dynamic
> > > > > > > > > > >> > > repartitioning is done idempotently. All the
> > > partitions
> > > > > are
> > > > > > > > rolled
> > > > > > > > > > >> back
> > > > > > > > > > >> > to
> > > > > > > > > > >> > > a common checkpoint and the new StreamCodec is
> > applied
> > > > > > > starting
> > > > > > > > > > then.
> > > > > > > > > > >> The
> > > > > > > > > > >> > > statistics that the Stream Codec are given are the
> > > > > > statistics
> > > > > > > > for
> > > > > > > > > > the
> > > > > > > > > > >> > > windows computed before the common checkpoint that
> > the
> > > > > > > > partitions
> > > > > > > > > > are
> > > > > > > > > > >> > > rolled back to.
> > > > > > > > > > >> > >
> > > > > > > > > > >> > > In fact I think this feature could be added easily
> > by
> > > > > > avoiding
> > > > > > > > > > buffer
> > > > > > > > > > >> > > server entirely and by allowing the Partitioner to
> > > > > redefine
> > > > > > > the
> > > > > > > > > > >> > StreamCodec
> > > > > > > > > > >> > > for the operator when define partitions is called.
> > > > > > > > > > >> > >
> > > > > > > > > > >> >
> > > > > > > > > > >> > Are you saying this in context of recovery or in
> > > general?
> > > > > > > > > > >> >
> > > > > > > > > > >> >
> > > > > > > > > > >> > >
> > > > > > > > > > >> > > Thanks,
> > > > > > > > > > >> > > Tim
> > > > > > > > > > >> > >
> > > > > > > > > > >> > > On Thu, Feb 11, 2016 at 12:07 PM, Amol Kekre <
> > > > > > > > > [email protected]>
> > > > > > > > > > >> > wrote:
> > > > > > > > > > >> > >
> > > > > > > > > > >> > > > Gaurav,
> > > > > > > > > > >> > > > It would not be idempotent per partition, but
> will
> > > be
> > > > > > across
> > > > > > > > all
> > > > > > > > > > >> > > partitions
> > > > > > > > > > >> > > > combined. In this case the user would have
> > > explicitly
> > > > > > asked
> > > > > > > > for
> > > > > > > > > > >> such a
> > > > > > > > > > >> > > > pattern.
> > > > > > > > > > >> > > >
> > > > > > > > > > >> > > > Thks,
> > > > > > > > > > >> > > > Amol
> > > > > > > > > > >> > > >
> > > > > > > > > > >> > > >
> > > > > > > > > > >> > > > On Thu, Feb 11, 2016 at 12:04 PM, Gaurav Gupta <
> > > > > > > > > > >> > [email protected]
> > > > > > > > > > >> > > >
> > > > > > > > > > >> > > > wrote:
> > > > > > > > > > >> > > >
> > > > > > > > > > >> > > > > Pramod,
> > > > > > > > > > >> > > > >
> > > > > > > > > > >> > > > > How would it work with recovery? There could
> be
> > > > cases
> > > > > > > where
> > > > > > > > a
> > > > > > > > > > >> tuple
> > > > > > > > > > >> > > went
> > > > > > > > > > >> > > > to
> > > > > > > > > > >> > > > > P1 and post recovery it can go to P2
> > > > > > > > > > >> > > > >
> > > > > > > > > > >> > > > > Gaurav
> > > > > > > > > > >> > > > >
> > > > > > > > > > >> > > > > On Thu, Feb 11, 2016 at 11:56 AM, Pramod
> > Immaneni
> > > <
> > > > > > > > > > >> > > > [email protected]>
> > > > > > > > > > >> > > > > wrote:
> > > > > > > > > > >> > > > >
> > > > > > > > > > >> > > > > > Hi,
> > > > > > > > > > >> > > > > >
> > > > > > > > > > >> > > > > > There are scenarios where the downstream
> > > > partitions
> > > > > of
> > > > > > > an
> > > > > > > > > > >> upstream
> > > > > > > > > > >> > > > > operator
> > > > > > > > > > >> > > > > > are generally not performing uniformly
> > resulting
> > > > in
> > > > > an
> > > > > > > > > overall
> > > > > > > > > > >> > > > > sub-optimal
> > > > > > > > > > >> > > > > > performance dictated by the slowest
> > partitions.
> > > > The
> > > > > > > > reasons
> > > > > > > > > > >> could
> > > > > > > > > > >> > be
> > > > > > > > > > >> > > > data
> > > > > > > > > > >> > > > > > related such as some partitions are
> receiving
> > > more
> > > > > > data
> > > > > > > to
> > > > > > > > > > >> process
> > > > > > > > > > >> > > than
> > > > > > > > > > >> > > > > the
> > > > > > > > > > >> > > > > > others or could be environment related such
> as
> > > > some
> > > > > > > > > partitions
> > > > > > > > > > >> are
> > > > > > > > > > >> > > > > running
> > > > > > > > > > >> > > > > > slower than others because they are on
> heavily
> > > > > loaded
> > > > > > > > nodes.
> > > > > > > > > > >> > > > > >
> > > > > > > > > > >> > > > > > A solution based on currently available
> > > > > functionality
> > > > > > in
> > > > > > > > the
> > > > > > > > > > >> engine
> > > > > > > > > > >> > > > would
> > > > > > > > > > >> > > > > > be to write a StreamCodec implementation to
> > > > > distribute
> > > > > > > > data
> > > > > > > > > > >> among
> > > > > > > > > > >> > the
> > > > > > > > > > >> > > > > > partitions such that each partition is
> > receiving
> > > > > > similar
> > > > > > > > > > amount
> > > > > > > > > > >> of
> > > > > > > > > > >> > > data
> > > > > > > > > > >> > > > > to
> > > > > > > > > > >> > > > > > process. We should consider adding
> > StreamCodecs
> > > > like
> > > > > > > these
> > > > > > > > > to
> > > > > > > > > > >> the
> > > > > > > > > > >> > > > library
> > > > > > > > > > >> > > > > > but these however do not solve the problem
> > when
> > > it
> > > > > is
> > > > > > > > > > >> environment
> > > > > > > > > > >> > > > > related.
> > > > > > > > > > >> > > > > >
> > > > > > > > > > >> > > > > > For that a better and more comprehensive
> > > approach
> > > > > > would
> > > > > > > be
> > > > > > > > > > look
> > > > > > > > > > >> at
> > > > > > > > > > >> > > how
> > > > > > > > > > >> > > > > data
> > > > > > > > > > >> > > > > > is being consumed by the downstream
> partitions
> > > > from
> > > > > > the
> > > > > > > > > > >> > BufferServer
> > > > > > > > > > >> > > > and
> > > > > > > > > > >> > > > > > use that information to make decisions on
> how
> > to
> > > > > send
> > > > > > > > future
> > > > > > > > > > >> data.
> > > > > > > > > > >> > If
> > > > > > > > > > >> > > > > some
> > > > > > > > > > >> > > > > > partitions are behind others in consuming
> data
> > > > then
> > > > > > data
> > > > > > > > can
> > > > > > > > > > be
> > > > > > > > > > >> > > > directed
> > > > > > > > > > >> > > > > to
> > > > > > > > > > >> > > > > > the other partitions. One way to do this
> would
> > > be
> > > > to
> > > > > > > relay
> > > > > > > > > > this
> > > > > > > > > > >> > type
> > > > > > > > > > >> > > of
> > > > > > > > > > >> > > > > > statistical and positional information from
> > > > > > BufferServer
> > > > > > > > to
> > > > > > > > > > the
> > > > > > > > > > >> > > > upstream
> > > > > > > > > > >> > > > > > publishers. The publishers can use this
> > > > information
> > > > > in
> > > > > > > > ways
> > > > > > > > > > >> such as
> > > > > > > > > > >> > > > > making
> > > > > > > > > > >> > > > > > it available to StreamCodecs to affect
> > > destination
> > > > > of
> > > > > > > > future
> > > > > > > > > > >> data.
> > > > > > > > > > >> > > > > >
> > > > > > > > > > >> > > > > > What do you think.
> > > > > > > > > > >> > > > > >
> > > > > > > > > > >> > > > > > Thanks
> > > > > > > > > > >> > > > > >
> > > > > > > > > > >> > > > >
> > > > > > > > > > >> > > >
> > > > > > > > > > >> > >
> > > > > > > > > > >> >
> > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to