This will work if a single partition crash will effect a restart on others too. Given hardware failures are rare and the users wants latency SLA, it is a viable trade off -> "Costlier recovery on outage vs load balancing". Do note that since partitions are distributed, the overall latency impact of an outage will not neccesarily go up.
Thks Amol On Thu, Feb 11, 2016 at 6:03 PM, Gaurav Gupta <[email protected]> wrote: > How will it work in following scenario > > Say Operator O has two partitions P1 and P2. P1 was processing faster than > P2 and streamcodec decided to send more data to P1. P1 process some windows > and then P1 crashes and it came back to previous checkpoint. Now P1 comes > on a node which is slow and it processes slowly. So the streamcodec decides > to send less data to P1. In this case will application not loose data some > windows? > > Similarly in a reverse scenario, I think there will be duplicate of data. > > > > On Thu, Feb 11, 2016 at 5:46 PM, Pramod Immaneni <[email protected]> > wrote: > > > Inline > > > > On Thu, Feb 11, 2016 at 4:32 PM, Timothy Farkas <[email protected]> > > wrote: > > > > > Comments inline > > > > > > +1 Overall as well provided Apex-339 is implemented first and it is > > > documented that the mechanism should not be used with some stateful > > > operators. > > > > > > On Thu, Feb 11, 2016 at 4:20 PM, Pramod Immaneni < > [email protected] > > > > > > wrote: > > > > > > > Comments inline > > > > > > > > On Thu, Feb 11, 2016 at 4:13 PM, Timothy Farkas <[email protected] > > > > > > wrote: > > > > > > > > > Hey Pramod, > > > > > > > > > > I agree if APEX-339 is in place then it would work without > > redeploying > > > > > containers for operators that are Stateless, or a subset of > Stateful > > > > > operators. > > > > > > > > > > Addressing your previous questions. > > > > > > > > > > - The StatsListener can be used to see how far behind operators > are. > > > You > > > > > could determine what window the operator is on, or the number of > > tuples > > > > > it's processed so far, or how long > > > > > it takes it to complete a window. > > > > > > > > > > > > > What if tuples are different sizes and number of tuples processed > > doesn't > > > > reflect how far ahead or behind a downstream partition is? How is the > > > > information from StatsListener made available to the upstream > partition > > > > codecs. > > > > > > > What is the information Buffer Server can provide that the > StatsListener > > > cannot? > > > > > > > The stats information would have to be relayed down to the upstream > > operators. It's possible. > > > > > > > > > > The StatsListener can trigger a repartition. The information in the > > > StatsListener can be shared > > > with the partitioner by setting the same object for both in populate > Dag. > > > The partitioner can then > > > compute the new Stream Codec. The mechanism by which the upstream would > > be > > > updated with the new > > > Stream Codec would have to be implemented as it's currently not there. > > > > > > > > > > > > > > > > > > > > > - Some examples of Stateful operators that require repartitioning > of > > > > state > > > > > are the following: > > > > > - Deduper > > > > > In this case after updating the stream codec the > operator > > > may > > > > > allow a previously seen value to pass because the partition didn't > > > > receive > > > > > that value with the previous stream codec. > > > > > - A key value store that holds aggregations for each key. > > > > > In this case multiple partitions would hold partial > > > > aggregations > > > > > for a key, when they are expecting to hold the complete > aggregation. > > > > > > > > > > > > > Agreed for deduper. For the second case a unifier is a better > approach > > so > > > > that you are not affected by key skew in general. > > > > > > > This is not always possible. We can discuss this offline, since it > won't > > > add much to the discussion here to go into the details. > > > > > > > > Yes not always. > > > > > > > > > > > > > > > > > > > > > > Tim > > > > > > > > > > On Thu, Feb 11, 2016 at 4:04 PM, Pramod Immaneni < > > > [email protected] > > > > > > > > > > wrote: > > > > > > > > > > > Additionally it can be treated as a non-idempotent stream for > > > recovery. > > > > > > Look at APEXCORE-339. In cases where the downstream partitions > > > require > > > > > some > > > > > > key based partitioning, what you are suggesting would be a good > > > > approach > > > > > > but it will require more complex logic in the StreamCodec to both > > key > > > > and > > > > > > load based partitioning. > > > > > > > > > > > > On Thu, Feb 11, 2016 at 3:49 PM, Pramod Immaneni < > > > > [email protected] > > > > > > > > > > > > wrote: > > > > > > > > > > > > > How would you know how far behind partitions are without > > > interacting > > > > > with > > > > > > > BufferServer like you were mentioning in the earlier email. > > > Secondly > > > > > why > > > > > > > would changing where the data is sent to based mandate > > > > re-partitioning > > > > > if > > > > > > > the downstream partitions can handle data with different keys. > > > > > > > > > > > > > > On Thu, Feb 11, 2016 at 3:43 PM, Timothy Farkas < > > > [email protected] > > > > > > > > > > > > wrote: > > > > > > > > > > > > > >> Hey Pramod, > > > > > > >> > > > > > > >> I think in general and for recovery the existing Partitioning > > > > > machinery > > > > > > >> can > > > > > > >> be reused to update the Stream Codec. > > > > > > >> The reason why is because If the operator is Stateful and > > changes > > > > are > > > > > > made > > > > > > >> to the Stream Codec, the state of the partitions will also > have > > to > > > > be > > > > > > >> repartitioned. > > > > > > >> In this case the number of partitions will remain the same, > just > > > the > > > > > > state > > > > > > >> of the partitions is reshuffled. The implementation for this > > state > > > > > > >> reshuffling in a fault tolerant way is already handled by the > > > > Dynamic > > > > > > >> Partitioning logic, so it could be extended to update the > Stream > > > > Codec > > > > > > as > > > > > > >> well. > > > > > > >> > > > > > > >> If the operator is Stateless, it may be possible to do without > > > > > > redeploying > > > > > > >> any containers. But with the way I am envisioning it, I think > > > there > > > > > > would > > > > > > >> be a lot of difficult to handle corner cases for recovery. > > > > > > >> > > > > > > >> Tim > > > > > > >> > > > > > > >> On Thu, Feb 11, 2016 at 3:07 PM, Pramod Immaneni < > > > > > > [email protected]> > > > > > > >> wrote: > > > > > > >> > > > > > > >> > Comment inline. > > > > > > >> > > > > > > > >> > On Thu, Feb 11, 2016 at 12:21 PM, Timothy Farkas < > > > > > [email protected] > > > > > > > > > > > > > >> > wrote: > > > > > > >> > > > > > > > >> > > +1 for the idea. > > > > > > >> > > > > > > > > >> > > Gaurav, this could be done idempotently in the same way > that > > > > > dynamic > > > > > > >> > > repartitioning is done idempotently. All the partitions > are > > > > rolled > > > > > > >> back > > > > > > >> > to > > > > > > >> > > a common checkpoint and the new StreamCodec is applied > > > starting > > > > > > then. > > > > > > >> The > > > > > > >> > > statistics that the Stream Codec are given are the > > statistics > > > > for > > > > > > the > > > > > > >> > > windows computed before the common checkpoint that the > > > > partitions > > > > > > are > > > > > > >> > > rolled back to. > > > > > > >> > > > > > > > > >> > > In fact I think this feature could be added easily by > > avoiding > > > > > > buffer > > > > > > >> > > server entirely and by allowing the Partitioner to > redefine > > > the > > > > > > >> > StreamCodec > > > > > > >> > > for the operator when define partitions is called. > > > > > > >> > > > > > > > > >> > > > > > > > >> > Are you saying this in context of recovery or in general? > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > >> > > Thanks, > > > > > > >> > > Tim > > > > > > >> > > > > > > > > >> > > On Thu, Feb 11, 2016 at 12:07 PM, Amol Kekre < > > > > > [email protected]> > > > > > > >> > wrote: > > > > > > >> > > > > > > > > >> > > > Gaurav, > > > > > > >> > > > It would not be idempotent per partition, but will be > > across > > > > all > > > > > > >> > > partitions > > > > > > >> > > > combined. In this case the user would have explicitly > > asked > > > > for > > > > > > >> such a > > > > > > >> > > > pattern. > > > > > > >> > > > > > > > > > >> > > > Thks, > > > > > > >> > > > Amol > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > On Thu, Feb 11, 2016 at 12:04 PM, Gaurav Gupta < > > > > > > >> > [email protected] > > > > > > >> > > > > > > > > > >> > > > wrote: > > > > > > >> > > > > > > > > > >> > > > > Pramod, > > > > > > >> > > > > > > > > > > >> > > > > How would it work with recovery? There could be cases > > > where > > > > a > > > > > > >> tuple > > > > > > >> > > went > > > > > > >> > > > to > > > > > > >> > > > > P1 and post recovery it can go to P2 > > > > > > >> > > > > > > > > > > >> > > > > Gaurav > > > > > > >> > > > > > > > > > > >> > > > > On Thu, Feb 11, 2016 at 11:56 AM, Pramod Immaneni < > > > > > > >> > > > [email protected]> > > > > > > >> > > > > wrote: > > > > > > >> > > > > > > > > > > >> > > > > > Hi, > > > > > > >> > > > > > > > > > > > >> > > > > > There are scenarios where the downstream partitions > of > > > an > > > > > > >> upstream > > > > > > >> > > > > operator > > > > > > >> > > > > > are generally not performing uniformly resulting in > an > > > > > overall > > > > > > >> > > > > sub-optimal > > > > > > >> > > > > > performance dictated by the slowest partitions. The > > > > reasons > > > > > > >> could > > > > > > >> > be > > > > > > >> > > > data > > > > > > >> > > > > > related such as some partitions are receiving more > > data > > > to > > > > > > >> process > > > > > > >> > > than > > > > > > >> > > > > the > > > > > > >> > > > > > others or could be environment related such as some > > > > > partitions > > > > > > >> are > > > > > > >> > > > > running > > > > > > >> > > > > > slower than others because they are on heavily > loaded > > > > nodes. > > > > > > >> > > > > > > > > > > > >> > > > > > A solution based on currently available > functionality > > in > > > > the > > > > > > >> engine > > > > > > >> > > > would > > > > > > >> > > > > > be to write a StreamCodec implementation to > distribute > > > > data > > > > > > >> among > > > > > > >> > the > > > > > > >> > > > > > partitions such that each partition is receiving > > similar > > > > > > amount > > > > > > >> of > > > > > > >> > > data > > > > > > >> > > > > to > > > > > > >> > > > > > process. We should consider adding StreamCodecs like > > > these > > > > > to > > > > > > >> the > > > > > > >> > > > library > > > > > > >> > > > > > but these however do not solve the problem when it > is > > > > > > >> environment > > > > > > >> > > > > related. > > > > > > >> > > > > > > > > > > > >> > > > > > For that a better and more comprehensive approach > > would > > > be > > > > > > look > > > > > > >> at > > > > > > >> > > how > > > > > > >> > > > > data > > > > > > >> > > > > > is being consumed by the downstream partitions from > > the > > > > > > >> > BufferServer > > > > > > >> > > > and > > > > > > >> > > > > > use that information to make decisions on how to > send > > > > future > > > > > > >> data. > > > > > > >> > If > > > > > > >> > > > > some > > > > > > >> > > > > > partitions are behind others in consuming data then > > data > > > > can > > > > > > be > > > > > > >> > > > directed > > > > > > >> > > > > to > > > > > > >> > > > > > the other partitions. One way to do this would be to > > > relay > > > > > > this > > > > > > >> > type > > > > > > >> > > of > > > > > > >> > > > > > statistical and positional information from > > BufferServer > > > > to > > > > > > the > > > > > > >> > > > upstream > > > > > > >> > > > > > publishers. The publishers can use this information > in > > > > ways > > > > > > >> such as > > > > > > >> > > > > making > > > > > > >> > > > > > it available to StreamCodecs to affect destination > of > > > > future > > > > > > >> data. > > > > > > >> > > > > > > > > > > > >> > > > > > What do you think. > > > > > > >> > > > > > > > > > > > >> > > > > > Thanks > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
