Additionally it can be treated as a non-idempotent stream for recovery. Look at APEXCORE-339. In cases where the downstream partitions require some key based partitioning, what you are suggesting would be a good approach but it will require more complex logic in the StreamCodec to both key and load based partitioning.
On Thu, Feb 11, 2016 at 3:49 PM, Pramod Immaneni <[email protected]> wrote: > How would you know how far behind partitions are without interacting with > BufferServer like you were mentioning in the earlier email. Secondly why > would changing where the data is sent to based mandate re-partitioning if > the downstream partitions can handle data with different keys. > > On Thu, Feb 11, 2016 at 3:43 PM, Timothy Farkas <[email protected]> > wrote: > >> Hey Pramod, >> >> I think in general and for recovery the existing Partitioning machinery >> can >> be reused to update the Stream Codec. >> The reason why is because If the operator is Stateful and changes are made >> to the Stream Codec, the state of the partitions will also have to be >> repartitioned. >> In this case the number of partitions will remain the same, just the state >> of the partitions is reshuffled. The implementation for this state >> reshuffling in a fault tolerant way is already handled by the Dynamic >> Partitioning logic, so it could be extended to update the Stream Codec as >> well. >> >> If the operator is Stateless, it may be possible to do without redeploying >> any containers. But with the way I am envisioning it, I think there would >> be a lot of difficult to handle corner cases for recovery. >> >> Tim >> >> On Thu, Feb 11, 2016 at 3:07 PM, Pramod Immaneni <[email protected]> >> wrote: >> >> > Comment inline. >> > >> > On Thu, Feb 11, 2016 at 12:21 PM, Timothy Farkas <[email protected]> >> > wrote: >> > >> > > +1 for the idea. >> > > >> > > Gaurav, this could be done idempotently in the same way that dynamic >> > > repartitioning is done idempotently. All the partitions are rolled >> back >> > to >> > > a common checkpoint and the new StreamCodec is applied starting then. >> The >> > > statistics that the Stream Codec are given are the statistics for the >> > > windows computed before the common checkpoint that the partitions are >> > > rolled back to. >> > > >> > > In fact I think this feature could be added easily by avoiding buffer >> > > server entirely and by allowing the Partitioner to redefine the >> > StreamCodec >> > > for the operator when define partitions is called. >> > > >> > >> > Are you saying this in context of recovery or in general? >> > >> > >> > > >> > > Thanks, >> > > Tim >> > > >> > > On Thu, Feb 11, 2016 at 12:07 PM, Amol Kekre <[email protected]> >> > wrote: >> > > >> > > > Gaurav, >> > > > It would not be idempotent per partition, but will be across all >> > > partitions >> > > > combined. In this case the user would have explicitly asked for >> such a >> > > > pattern. >> > > > >> > > > Thks, >> > > > Amol >> > > > >> > > > >> > > > On Thu, Feb 11, 2016 at 12:04 PM, Gaurav Gupta < >> > [email protected] >> > > > >> > > > wrote: >> > > > >> > > > > Pramod, >> > > > > >> > > > > How would it work with recovery? There could be cases where a >> tuple >> > > went >> > > > to >> > > > > P1 and post recovery it can go to P2 >> > > > > >> > > > > Gaurav >> > > > > >> > > > > On Thu, Feb 11, 2016 at 11:56 AM, Pramod Immaneni < >> > > > [email protected]> >> > > > > wrote: >> > > > > >> > > > > > Hi, >> > > > > > >> > > > > > There are scenarios where the downstream partitions of an >> upstream >> > > > > operator >> > > > > > are generally not performing uniformly resulting in an overall >> > > > > sub-optimal >> > > > > > performance dictated by the slowest partitions. The reasons >> could >> > be >> > > > data >> > > > > > related such as some partitions are receiving more data to >> process >> > > than >> > > > > the >> > > > > > others or could be environment related such as some partitions >> are >> > > > > running >> > > > > > slower than others because they are on heavily loaded nodes. >> > > > > > >> > > > > > A solution based on currently available functionality in the >> engine >> > > > would >> > > > > > be to write a StreamCodec implementation to distribute data >> among >> > the >> > > > > > partitions such that each partition is receiving similar amount >> of >> > > data >> > > > > to >> > > > > > process. We should consider adding StreamCodecs like these to >> the >> > > > library >> > > > > > but these however do not solve the problem when it is >> environment >> > > > > related. >> > > > > > >> > > > > > For that a better and more comprehensive approach would be look >> at >> > > how >> > > > > data >> > > > > > is being consumed by the downstream partitions from the >> > BufferServer >> > > > and >> > > > > > use that information to make decisions on how to send future >> data. >> > If >> > > > > some >> > > > > > partitions are behind others in consuming data then data can be >> > > > directed >> > > > > to >> > > > > > the other partitions. One way to do this would be to relay this >> > type >> > > of >> > > > > > statistical and positional information from BufferServer to the >> > > > upstream >> > > > > > publishers. The publishers can use this information in ways >> such as >> > > > > making >> > > > > > it available to StreamCodecs to affect destination of future >> data. >> > > > > > >> > > > > > What do you think. >> > > > > > >> > > > > > Thanks >> > > > > > >> > > > > >> > > > >> > > >> > >> > >
