Chandni: Good point on ordering. We need to handle the case. Even the ordering is not guaranteed when upstream operator has multiple instances.
Regards, Sandeep On Tue, Dec 29, 2015 at 8:39 PM, Chandni Singh <[email protected]> wrote: > I agree with Sandeep that at-least once with databases is not the right > approach. All the output adaptors we have in the library are written so > that they do not write duplicate entries. For example FileOutputOperator. > This has nothing to do with the processing mode feature offered by the > platform. When writing to any external entity we have to ensure that we do > not introduce duplicates. > > Priyanka, > I am assuming that the batch id you mean here is the number of tuples in > the app window which are persisted otherwise with every tuple there needs > to be a batch id so when a window is being replayed you will know which > tuple to discard. > > We have followed that approach in some concrete implementations in POCs. > One of the reasons that we haven't added that here is that it relies on an > order of tuples within an application window. This again depends on the > upstream operator. For eg. in the default mode of dedup, the ordering of > tuples within an application window is not guaranteed. > > Chandni > > > > > On Tue, Dec 29, 2015 at 4:53 AM, Priyanka Gugale <[email protected] > > > wrote: > > > One more option: > > > > We can keep track of windowId & batchId i.e. we save batchId with > windowId > > when we commit a batch within a window. e.g. we are in window 10 and we > > have written second batch in window 10, we commit windowId=10 and > batchId=2 > > to DB. While recovery we won't process batches within last window which > are > > marked as committed. > > > > -Priyanka > > > > On Tue, Dec 29, 2015 at 3:54 PM, Sandeep Deshmukh < > [email protected] > > > > > wrote: > > > > > Not sure if "At least once" is right behavior for databases. We may not > > > always have primary key to update or insert. > > > > > > > > > Regards, > > > Sandeep > > > > > > On Tue, Dec 29, 2015 at 2:23 PM, Priyanka Gugale <[email protected]> > > > wrote: > > > > > > > Hi, > > > > > > > > Thanks for your inputs Chandni. I guess what you are suggesting is > > > similar > > > > to AbstractJdbcNonTransactionableBatchOutputOperator which is batch > non > > > > transactional operation. That is one of the good option. > > > > > > > > I am also thinking of a possibility of having "At least once" > behavior > > > with > > > > Transactional store. In this, we keep committing batches within a > > window. > > > > Each batch commit will be a transaction. On recovery we start > > processing > > > > from last committed window (don't exclude last committed window, as > it > > > > could be partially written). If the queries are update or insert > > queries > > > > using primary key, it shouldn't be a problem if we reply > insert/update > > > > command. It will have same effect on database (of course not > applicable > > > for > > > > all usecases). Does this look better? > > > > > > > > -Priyanka > > > > > > > > On Tue, Dec 29, 2015 at 11:31 AM, Chandni Singh < > > [email protected] > > > > > > > > wrote: > > > > > > > > > Yeah I understand there is a problem that app window size is time > > based > > > > > here not number of events based. However I don't think having a max > > > batch > > > > > size in this class will help because that causes problems with > saving > > > the > > > > > tuples exactly once and idempotency. > > > > > > > > > > I was just trying to let you know why the batch transactional store > > is > > > > how > > > > > it is. > > > > > > > > > > Checkout the non-transactional store output operator > > > > > (AbstractStoreOutputOperator) and its implementations where window > id > > > is > > > > > saved with each update. I think having a batch extension of that > can > > > > > achieve what is needed here in a way that the operator will still > be > > > > > fault-tolerant and idempotent. > > > > > > > > > > Thanks, > > > > > Chandni > > > > > > > > > > On Mon, Dec 28, 2015 at 9:45 PM, Chinmay Kolhatkar < > > > > > [email protected]> > > > > > wrote: > > > > > > > > > > > Hi Chandni, > > > > > > > > > > > > I totally agree with you that the transactions should be > > idempotent. > > > > And > > > > > > that needs to be taken care of if the batch size is configurable. > > > > > > > > > > > > Though, I have a question related to the second part where batch > > size > > > > is > > > > > > controlled by by controlling app window size. > > > > > > I agree with you that aggregation window is a unit of aggregation > > > > > provided > > > > > > by platform. But, if I understand correctly, that is time based. > > > > > > If I want to aggregate based on number of tuples, would this be > > > > suitable? > > > > > > > > > > > > To give you an example, lets say I have a store on which the > > > > transaction > > > > > > size should never exceed 1000 operations. > > > > > > And as a streaming application, it would be difficult to guess > what > > > > would > > > > > > be the input rate, hence its not possible to guess how many > tuples > > > will > > > > > > become part of a single application window. In such case, how can > > the > > > > > > application window size can be used to configure transaction > batch > > > > size? > > > > > > Wouldn't it make more sense to have the control via exact number > of > > > > > tuples? > > > > > > > > > > > > Thanks, > > > > > > Chinmay. > > > > > > > > > > > > > > > > > > ~ Chinmay. > > > > > > > > > > > > On Tue, Dec 29, 2015 at 12:13 AM, Chandni Singh < > > > > [email protected] > > > > > > > > > > > > wrote: > > > > > > > > > > > > > Hey Chinmay/Priyanka, > > > > > > > > > > > > > > We need to write tuples exactly once in the store. Please > address > > > the > > > > > > > failure scenarios on how to achieve exactly once and > > idempotency. I > > > > > > > mentioned in my previous mail why multiple batches in a window > > is a > > > > > > problem > > > > > > > with exactly once. > > > > > > > > > > > > > > Control via app window would mean, tuning the functionality by > > > > > > controlling > > > > > > > the platform params. I think it's best one gets option to > > seperate > > > > the > > > > > > > concerns of platform and that of app logic. > > > > > > > > > > > > > > Application window is a unit of aggregation. Every operator in > a > > > DAG > > > > > can > > > > > > > have different application window which is the support platform > > > > > provides > > > > > > > for application logic. > > > > > > > > > > > > > > Chandni > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Dec 28, 2015 at 10:35 AM, Chinmay Kolhatkar < > > > > > > > [email protected] > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > Just a thought on how it can possibly be done. > > > > > > > > > > > > > > > > The pseudo code might look like this: > > > > > > > > > > > > > > > > processTuple() > > > > > > > > { > > > > > > > > If(batchSize < configuredBatchSize){ > > > > > > > > //add to the batch > > > > > > > > } > > > > > > > > Else { > > > > > > > > // process the batch as a transaction > > > > > > > > // empty the data structure of batch. > > > > > > > > } > > > > > > > > } > > > > > > > > > > > > > > > > endWindow() > > > > > > > > { > > > > > > > > // process the batch as transaction. > > > > > > > > // empty the data structure of batch. > > > > > > > > } > > > > > > > > > > > > > > > > This way, user can get better/direct control over what > > > transaction > > > > > > means. > > > > > > > > > > > > > > > > As chandni rightly said, one can reduce the application > window > > > size > > > > > for > > > > > > > the > > > > > > > > operator, and that would reduce the batch size. But that's > not > > > > > > something > > > > > > > > which looks intuitive from user's perspective. > > > > > > > > Control via app window would mean, tuning the functionality > by > > > > > > > controlling > > > > > > > > the platform params. I think it's best one gets option to > > > seperate > > > > > the > > > > > > > > concerns of platform and that of app logic. > > > > > > > > > > > > > > > > If one wants to control the batch size, he/she should be able > > to > > > do > > > > > > that > > > > > > > by > > > > > > > > just setting the property of batch size(a number), and not by > > > > > changing > > > > > > > app > > > > > > > > window size (an indirect time unit). > > > > > > > > > > > > > > > > ~ Chinmay > > > > > > > > On 28 Dec 2015 22:53, "Chandni Singh" < > [email protected] > > > > > > > > wrote: > > > > > > > > > > > > > > > > > But you will not allow multiple batches in the same window? > > > > > > > > > Can you please elaborate on failure scenarios and how it > > > affects > > > > > > > > > idempotency. > > > > > > > > > > > > > > > > > > Chandni > > > > > > > > > > > > > > > > > > On Mon, Dec 28, 2015 at 2:32 AM, Priyanka Gugale < > > > > > > > > [email protected] > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > > > Sorry if I was not clear, but I am trying to propose the > > > > MAX_SIZE > > > > > > per > > > > > > > > > > window which the operator could process. The size could > be > > > less > > > > > > than > > > > > > > > the > > > > > > > > > > MAX_SIZE, no restriction about that. > > > > > > > > > > > > > > > > > > > > -Priyanka > > > > > > > > > > > > > > > > > > > > On Mon, Dec 28, 2015 at 3:22 PM, Chandni Singh < > > > > > > > > [email protected]> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > How do you propose to to restrict the no. of tuples > > > processed > > > > > in > > > > > > an > > > > > > > > > > > application window < batch size. > > > > > > > > > > > > > > > > > > > > > > I don't see a way to enforce that batch size can never > be > > > > less > > > > > > > tuples > > > > > > > > > > > processed in an application window. > > > > > > > > > > > > > > > > > > > > > > On Mon, Dec 28, 2015 at 1:25 AM, Priyanka Gugale < > > > > > > > [email protected]> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi Chandni, > > > > > > > > > > > > > > > > > > > > > > > > How about restricting tuples which can be processed > per > > > > > window. > > > > > > > If > > > > > > > > > > > someone > > > > > > > > > > > > wants to process small and frequent batches, he can > set > > > > batch > > > > > > > size > > > > > > > > to > > > > > > > > > > > some > > > > > > > > > > > > small value and also reduce the window size. This > would > > > > build > > > > > > > some > > > > > > > > > back > > > > > > > > > > > > pressure of course. But that could be acceptable if > one > > > > > really > > > > > > > want > > > > > > > > > to > > > > > > > > > > > > restrict batch size. > > > > > > > > > > > > The though was triggered while working on Cassandra > > > output > > > > > > > > operator. > > > > > > > > > > > > Cassandra creates problem in processing batches of > size > > > > > greater > > > > > > > > than > > > > > > > > > > some > > > > > > > > > > > > value (don't recall exact number right now). Other > > > > databases > > > > > > may > > > > > > > > want > > > > > > > > > > to > > > > > > > > > > > > restrict the batch size for similar or other reasons. > > > > > > > > > > > > > > > > > > > > > > > > -Priyanka > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Dec 28, 2015 at 2:46 PM, Chandni Singh < > > > > > > > > > > [email protected]> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Priyanka, > > > > > > > > > > > > > > > > > > > > > > > > > > AbstractBatchTransactionableStore assumes all > tuples > > in > > > > one > > > > > > > > > > application > > > > > > > > > > > > as > > > > > > > > > > > > > a batch because it needs to store the tuples in the > > > store > > > > > > > > > > exactly-once. > > > > > > > > > > > > > > > > > > > > > > > > > > If there is more than one batch in an application > > > window, > > > > > > then > > > > > > > to > > > > > > > > > > store > > > > > > > > > > > > the > > > > > > > > > > > > > tuples exactly once the window Id needs to be > written > > > > with > > > > > > > every > > > > > > > > > > tuple > > > > > > > > > > > as > > > > > > > > > > > > > well which is not that efficient. Therefore we take > > > > > advantage > > > > > > > of > > > > > > > > > the > > > > > > > > > > > > > transaction support by saving just the window id > once > > > > (not > > > > > > with > > > > > > > > > every > > > > > > > > > > > > > tuple) but this necessitates all the tuples to be > > > > > considered > > > > > > > as a > > > > > > > > > > > batch. > > > > > > > > > > > > > > > > > > > > > > > > > > Every operator in a DAG can have its own > application > > > > window > > > > > > > size. > > > > > > > > > So > > > > > > > > > > to > > > > > > > > > > > > > reduce the size per batch, the application window > > > > attribute > > > > > > > needs > > > > > > > > > to > > > > > > > > > > be > > > > > > > > > > > > > modified. > > > > > > > > > > > > > > > > > > > > > > > > > > Chandni > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Dec 28, 2015 at 1:01 AM, Chinmay Kolhatkar > < > > > > > > > > > > > > > [email protected]> > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > +1 for this. > > > > > > > > > > > > > > > > > > > > > > > > > > > > ~ Chinmay. > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Dec 28, 2015 at 2:27 PM, Priyanka Gugale > < > > > > > > > > > > [email protected]> > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > In Malhar we have an > > > > > > > > > > > > > > > operator > > > > > AbstractBatchTransactionableStoreOutputOperator > > > > > > > > which > > > > > > > > > > > > creates > > > > > > > > > > > > > > > batches based on tuples received in a window. > At > > > the > > > > > end > > > > > > of > > > > > > > > the > > > > > > > > > > > > window > > > > > > > > > > > > > > > these batches are sent to database for > > processing. > > > > > > > > > > > > > > > There is no way to configure MAX_SIZE on these > > > > batches. > > > > > > > Based > > > > > > > > > on > > > > > > > > > > > > input > > > > > > > > > > > > > > rate > > > > > > > > > > > > > > > the batch sizes can grow very high, and we > might > > > want > > > > > to > > > > > > > > > restrict > > > > > > > > > > > > batch > > > > > > > > > > > > > > > size. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Any operator can extend and do batch management > > on > > > > > their > > > > > > > own, > > > > > > > > > > but I > > > > > > > > > > > > see > > > > > > > > > > > > > > it > > > > > > > > > > > > > > > as generic requirement and IMO we should change > > > base > > > > > > class > > > > > > > > i.e. > > > > > > > > > > > > > > > AbstractBatchTransactionableStoreOutputOperator > > > class > > > > > to > > > > > > > > accept > > > > > > > > > > > > > MAX_SIZE > > > > > > > > > > > > > > > for batch from outside. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Any opinion on this? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -Priyanka > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
