Yes he is :) Thks Amol
On Thu, Feb 11, 2016 at 7:28 PM, Timothy Farkas <[email protected]> wrote: > Oh thanks, now I see. For some reason I thought the streamCodec is used to > get the partitionId when data is taken out of buffer server, but the > partitionId is determined when data is put into buffer server. In which > case Pramod is right :). > > Tim > > On Thu, Feb 11, 2016 at 7:17 PM, Amol Kekre <[email protected]> wrote: > > > Tim, > > Data for window 20-40 is already in bufferserver. It is post partition > > decision? > > > > Thks > > Amol > > > > On Thu, Feb 11, 2016 at 7:14 PM, Timothy Farkas <[email protected]> > > wrote: > > > > > Hey Pramod, > > > > > > I thought APEX-339 would cause everyone to get restarted to a common > > > checkpoint if P1 failed. But I think I misunderstood. > > > Based on what you just said, how do you guarantee that P1 will receive > > the > > > same data as before? Will Stream Codecs only apply to windows higher > > than a > > > certain id? > > > > > > Building off of Gaurav's example. Let's say we have the following > > > situation: > > > > > > 1. we are using Stream Codec A, > > > 2. then on Window 30 we start using Stream Code B. > > > 3. Window 40 P1 fails and comes back to window 20. > > > 4. P2 never failed and continues running this whole time. > > > > > > Will Stream Codec A still apply to the data P1 gets for window 20 - 30 > > and > > > Stream Codec B will be used for windows 30 and above? If it is not, > some > > > data could be > > > lost or duplicated. > > > > > > Thanks, > > > Tim > > > > > > On Thu, Feb 11, 2016 at 6:58 PM, Pramod Immaneni < > [email protected] > > > > > > wrote: > > > > > > > So P1 and P2 have an upstream operator sending data, when P1 restarts > > it > > > > will receive same data as before already present in buffer server. If > > the > > > > upstream operator has to restart all downstream will be reset to same > > > > checkpoint. > > > > > > > > On Thu, Feb 11, 2016 at 6:03 PM, Gaurav Gupta < > > [email protected]> > > > > wrote: > > > > > > > > > How will it work in following scenario > > > > > > > > > > Say Operator O has two partitions P1 and P2. P1 was processing > faster > > > > than > > > > > P2 and streamcodec decided to send more data to P1. P1 process some > > > > windows > > > > > and then P1 crashes and it came back to previous checkpoint. Now P1 > > > comes > > > > > on a node which is slow and it processes slowly. So the streamcodec > > > > decides > > > > > to send less data to P1. In this case will application not loose > data > > > > some > > > > > windows? > > > > > > > > > > Similarly in a reverse scenario, I think there will be duplicate of > > > data. > > > > > > > > > > > > > > > > > > > > On Thu, Feb 11, 2016 at 5:46 PM, Pramod Immaneni < > > > [email protected] > > > > > > > > > > wrote: > > > > > > > > > > > Inline > > > > > > > > > > > > On Thu, Feb 11, 2016 at 4:32 PM, Timothy Farkas < > > [email protected] > > > > > > > > > > wrote: > > > > > > > > > > > > > Comments inline > > > > > > > > > > > > > > +1 Overall as well provided Apex-339 is implemented first and > it > > is > > > > > > > documented that the mechanism should not be used with some > > stateful > > > > > > > operators. > > > > > > > > > > > > > > On Thu, Feb 11, 2016 at 4:20 PM, Pramod Immaneni < > > > > > [email protected] > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Comments inline > > > > > > > > > > > > > > > > On Thu, Feb 11, 2016 at 4:13 PM, Timothy Farkas < > > > > [email protected] > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hey Pramod, > > > > > > > > > > > > > > > > > > I agree if APEX-339 is in place then it would work without > > > > > > redeploying > > > > > > > > > containers for operators that are Stateless, or a subset of > > > > > Stateful > > > > > > > > > operators. > > > > > > > > > > > > > > > > > > Addressing your previous questions. > > > > > > > > > > > > > > > > > > - The StatsListener can be used to see how far behind > > operators > > > > > are. > > > > > > > You > > > > > > > > > could determine what window the operator is on, or the > number > > > of > > > > > > tuples > > > > > > > > > it's processed so far, or how long > > > > > > > > > it takes it to complete a window. > > > > > > > > > > > > > > > > > > > > > > > > > What if tuples are different sizes and number of tuples > > processed > > > > > > doesn't > > > > > > > > reflect how far ahead or behind a downstream partition is? > How > > is > > > > the > > > > > > > > information from StatsListener made available to the upstream > > > > > partition > > > > > > > > codecs. > > > > > > > > > > > > > > > What is the information Buffer Server can provide that the > > > > > StatsListener > > > > > > > cannot? > > > > > > > > > > > > > > > > > > > The stats information would have to be relayed down to the > upstream > > > > > > operators. It's possible. > > > > > > > > > > > > > > > > > > > > > > > > > > The StatsListener can trigger a repartition. The information in > > the > > > > > > > StatsListener can be shared > > > > > > > with the partitioner by setting the same object for both in > > > populate > > > > > Dag. > > > > > > > The partitioner can then > > > > > > > compute the new Stream Codec. The mechanism by which the > upstream > > > > would > > > > > > be > > > > > > > updated with the new > > > > > > > Stream Codec would have to be implemented as it's currently not > > > > there. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > - Some examples of Stateful operators that require > > > repartitioning > > > > > of > > > > > > > > state > > > > > > > > > are the following: > > > > > > > > > - Deduper > > > > > > > > > In this case after updating the stream codec the > > > > > operator > > > > > > > may > > > > > > > > > allow a previously seen value to pass because the partition > > > > didn't > > > > > > > > receive > > > > > > > > > that value with the previous stream codec. > > > > > > > > > - A key value store that holds aggregations for each > > key. > > > > > > > > > In this case multiple partitions would hold > > partial > > > > > > > > aggregations > > > > > > > > > for a key, when they are expecting to hold the complete > > > > > aggregation. > > > > > > > > > > > > > > > > > > > > > > > > > Agreed for deduper. For the second case a unifier is a better > > > > > approach > > > > > > so > > > > > > > > that you are not affected by key skew in general. > > > > > > > > > > > > > > > This is not always possible. We can discuss this offline, since > > it > > > > > won't > > > > > > > add much to the discussion here to go into the details. > > > > > > > > > > > > > > > > > > > > Yes not always. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Tim > > > > > > > > > > > > > > > > > > On Thu, Feb 11, 2016 at 4:04 PM, Pramod Immaneni < > > > > > > > [email protected] > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Additionally it can be treated as a non-idempotent stream > > for > > > > > > > recovery. > > > > > > > > > > Look at APEXCORE-339. In cases where the downstream > > > partitions > > > > > > > require > > > > > > > > > some > > > > > > > > > > key based partitioning, what you are suggesting would be > a > > > good > > > > > > > > approach > > > > > > > > > > but it will require more complex logic in the StreamCodec > > to > > > > both > > > > > > key > > > > > > > > and > > > > > > > > > > load based partitioning. > > > > > > > > > > > > > > > > > > > > On Thu, Feb 11, 2016 at 3:49 PM, Pramod Immaneni < > > > > > > > > [email protected] > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > How would you know how far behind partitions are > without > > > > > > > interacting > > > > > > > > > with > > > > > > > > > > > BufferServer like you were mentioning in the earlier > > email. > > > > > > > Secondly > > > > > > > > > why > > > > > > > > > > > would changing where the data is sent to based mandate > > > > > > > > re-partitioning > > > > > > > > > if > > > > > > > > > > > the downstream partitions can handle data with > different > > > > keys. > > > > > > > > > > > > > > > > > > > > > > On Thu, Feb 11, 2016 at 3:43 PM, Timothy Farkas < > > > > > > > [email protected] > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > >> Hey Pramod, > > > > > > > > > > >> > > > > > > > > > > >> I think in general and for recovery the existing > > > > Partitioning > > > > > > > > > machinery > > > > > > > > > > >> can > > > > > > > > > > >> be reused to update the Stream Codec. > > > > > > > > > > >> The reason why is because If the operator is Stateful > > and > > > > > > changes > > > > > > > > are > > > > > > > > > > made > > > > > > > > > > >> to the Stream Codec, the state of the partitions will > > also > > > > > have > > > > > > to > > > > > > > > be > > > > > > > > > > >> repartitioned. > > > > > > > > > > >> In this case the number of partitions will remain the > > > same, > > > > > just > > > > > > > the > > > > > > > > > > state > > > > > > > > > > >> of the partitions is reshuffled. The implementation > for > > > this > > > > > > state > > > > > > > > > > >> reshuffling in a fault tolerant way is already handled > > by > > > > the > > > > > > > > Dynamic > > > > > > > > > > >> Partitioning logic, so it could be extended to update > > the > > > > > Stream > > > > > > > > Codec > > > > > > > > > > as > > > > > > > > > > >> well. > > > > > > > > > > >> > > > > > > > > > > >> If the operator is Stateless, it may be possible to do > > > > without > > > > > > > > > > redeploying > > > > > > > > > > >> any containers. But with the way I am envisioning it, > I > > > > think > > > > > > > there > > > > > > > > > > would > > > > > > > > > > >> be a lot of difficult to handle corner cases for > > recovery. > > > > > > > > > > >> > > > > > > > > > > >> Tim > > > > > > > > > > >> > > > > > > > > > > >> On Thu, Feb 11, 2016 at 3:07 PM, Pramod Immaneni < > > > > > > > > > > [email protected]> > > > > > > > > > > >> wrote: > > > > > > > > > > >> > > > > > > > > > > >> > Comment inline. > > > > > > > > > > >> > > > > > > > > > > > >> > On Thu, Feb 11, 2016 at 12:21 PM, Timothy Farkas < > > > > > > > > > [email protected] > > > > > > > > > > > > > > > > > > > > > >> > wrote: > > > > > > > > > > >> > > > > > > > > > > > >> > > +1 for the idea. > > > > > > > > > > >> > > > > > > > > > > > > >> > > Gaurav, this could be done idempotently in the > same > > > way > > > > > that > > > > > > > > > dynamic > > > > > > > > > > >> > > repartitioning is done idempotently. All the > > > partitions > > > > > are > > > > > > > > rolled > > > > > > > > > > >> back > > > > > > > > > > >> > to > > > > > > > > > > >> > > a common checkpoint and the new StreamCodec is > > applied > > > > > > > starting > > > > > > > > > > then. > > > > > > > > > > >> The > > > > > > > > > > >> > > statistics that the Stream Codec are given are the > > > > > > statistics > > > > > > > > for > > > > > > > > > > the > > > > > > > > > > >> > > windows computed before the common checkpoint that > > the > > > > > > > > partitions > > > > > > > > > > are > > > > > > > > > > >> > > rolled back to. > > > > > > > > > > >> > > > > > > > > > > > > >> > > In fact I think this feature could be added easily > > by > > > > > > avoiding > > > > > > > > > > buffer > > > > > > > > > > >> > > server entirely and by allowing the Partitioner to > > > > > redefine > > > > > > > the > > > > > > > > > > >> > StreamCodec > > > > > > > > > > >> > > for the operator when define partitions is called. > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > Are you saying this in context of recovery or in > > > general? > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > >> > > Thanks, > > > > > > > > > > >> > > Tim > > > > > > > > > > >> > > > > > > > > > > > > >> > > On Thu, Feb 11, 2016 at 12:07 PM, Amol Kekre < > > > > > > > > > [email protected]> > > > > > > > > > > >> > wrote: > > > > > > > > > > >> > > > > > > > > > > > > >> > > > Gaurav, > > > > > > > > > > >> > > > It would not be idempotent per partition, but > will > > > be > > > > > > across > > > > > > > > all > > > > > > > > > > >> > > partitions > > > > > > > > > > >> > > > combined. In this case the user would have > > > explicitly > > > > > > asked > > > > > > > > for > > > > > > > > > > >> such a > > > > > > > > > > >> > > > pattern. > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > Thks, > > > > > > > > > > >> > > > Amol > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > On Thu, Feb 11, 2016 at 12:04 PM, Gaurav Gupta < > > > > > > > > > > >> > [email protected] > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > wrote: > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > Pramod, > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > How would it work with recovery? There could > be > > > > cases > > > > > > > where > > > > > > > > a > > > > > > > > > > >> tuple > > > > > > > > > > >> > > went > > > > > > > > > > >> > > > to > > > > > > > > > > >> > > > > P1 and post recovery it can go to P2 > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > Gaurav > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > On Thu, Feb 11, 2016 at 11:56 AM, Pramod > > Immaneni > > > < > > > > > > > > > > >> > > > [email protected]> > > > > > > > > > > >> > > > > wrote: > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > Hi, > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > There are scenarios where the downstream > > > > partitions > > > > > of > > > > > > > an > > > > > > > > > > >> upstream > > > > > > > > > > >> > > > > operator > > > > > > > > > > >> > > > > > are generally not performing uniformly > > resulting > > > > in > > > > > an > > > > > > > > > overall > > > > > > > > > > >> > > > > sub-optimal > > > > > > > > > > >> > > > > > performance dictated by the slowest > > partitions. > > > > The > > > > > > > > reasons > > > > > > > > > > >> could > > > > > > > > > > >> > be > > > > > > > > > > >> > > > data > > > > > > > > > > >> > > > > > related such as some partitions are > receiving > > > more > > > > > > data > > > > > > > to > > > > > > > > > > >> process > > > > > > > > > > >> > > than > > > > > > > > > > >> > > > > the > > > > > > > > > > >> > > > > > others or could be environment related such > as > > > > some > > > > > > > > > partitions > > > > > > > > > > >> are > > > > > > > > > > >> > > > > running > > > > > > > > > > >> > > > > > slower than others because they are on > heavily > > > > > loaded > > > > > > > > nodes. > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > A solution based on currently available > > > > > functionality > > > > > > in > > > > > > > > the > > > > > > > > > > >> engine > > > > > > > > > > >> > > > would > > > > > > > > > > >> > > > > > be to write a StreamCodec implementation to > > > > > distribute > > > > > > > > data > > > > > > > > > > >> among > > > > > > > > > > >> > the > > > > > > > > > > >> > > > > > partitions such that each partition is > > receiving > > > > > > similar > > > > > > > > > > amount > > > > > > > > > > >> of > > > > > > > > > > >> > > data > > > > > > > > > > >> > > > > to > > > > > > > > > > >> > > > > > process. We should consider adding > > StreamCodecs > > > > like > > > > > > > these > > > > > > > > > to > > > > > > > > > > >> the > > > > > > > > > > >> > > > library > > > > > > > > > > >> > > > > > but these however do not solve the problem > > when > > > it > > > > > is > > > > > > > > > > >> environment > > > > > > > > > > >> > > > > related. > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > For that a better and more comprehensive > > > approach > > > > > > would > > > > > > > be > > > > > > > > > > look > > > > > > > > > > >> at > > > > > > > > > > >> > > how > > > > > > > > > > >> > > > > data > > > > > > > > > > >> > > > > > is being consumed by the downstream > partitions > > > > from > > > > > > the > > > > > > > > > > >> > BufferServer > > > > > > > > > > >> > > > and > > > > > > > > > > >> > > > > > use that information to make decisions on > how > > to > > > > > send > > > > > > > > future > > > > > > > > > > >> data. > > > > > > > > > > >> > If > > > > > > > > > > >> > > > > some > > > > > > > > > > >> > > > > > partitions are behind others in consuming > data > > > > then > > > > > > data > > > > > > > > can > > > > > > > > > > be > > > > > > > > > > >> > > > directed > > > > > > > > > > >> > > > > to > > > > > > > > > > >> > > > > > the other partitions. One way to do this > would > > > be > > > > to > > > > > > > relay > > > > > > > > > > this > > > > > > > > > > >> > type > > > > > > > > > > >> > > of > > > > > > > > > > >> > > > > > statistical and positional information from > > > > > > BufferServer > > > > > > > > to > > > > > > > > > > the > > > > > > > > > > >> > > > upstream > > > > > > > > > > >> > > > > > publishers. The publishers can use this > > > > information > > > > > in > > > > > > > > ways > > > > > > > > > > >> such as > > > > > > > > > > >> > > > > making > > > > > > > > > > >> > > > > > it available to StreamCodecs to affect > > > destination > > > > > of > > > > > > > > future > > > > > > > > > > >> data. > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > What do you think. > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > Thanks > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
