I agree with Sandeep that at-least once with databases is not the right approach. All the output adaptors we have in the library are written so that they do not write duplicate entries. For example FileOutputOperator. This has nothing to do with the processing mode feature offered by the platform. When writing to any external entity we have to ensure that we do not introduce duplicates.
Priyanka, I am assuming that the batch id you mean here is the number of tuples in the app window which are persisted otherwise with every tuple there needs to be a batch id so when a window is being replayed you will know which tuple to discard. We have followed that approach in some concrete implementations in POCs. One of the reasons that we haven't added that here is that it relies on an order of tuples within an application window. This again depends on the upstream operator. For eg. in the default mode of dedup, the ordering of tuples within an application window is not guaranteed. Chandni On Tue, Dec 29, 2015 at 4:53 AM, Priyanka Gugale <[email protected]> wrote: > One more option: > > We can keep track of windowId & batchId i.e. we save batchId with windowId > when we commit a batch within a window. e.g. we are in window 10 and we > have written second batch in window 10, we commit windowId=10 and batchId=2 > to DB. While recovery we won't process batches within last window which are > marked as committed. > > -Priyanka > > On Tue, Dec 29, 2015 at 3:54 PM, Sandeep Deshmukh <[email protected] > > > wrote: > > > Not sure if "At least once" is right behavior for databases. We may not > > always have primary key to update or insert. > > > > > > Regards, > > Sandeep > > > > On Tue, Dec 29, 2015 at 2:23 PM, Priyanka Gugale <[email protected]> > > wrote: > > > > > Hi, > > > > > > Thanks for your inputs Chandni. I guess what you are suggesting is > > similar > > > to AbstractJdbcNonTransactionableBatchOutputOperator which is batch non > > > transactional operation. That is one of the good option. > > > > > > I am also thinking of a possibility of having "At least once" behavior > > with > > > Transactional store. In this, we keep committing batches within a > window. > > > Each batch commit will be a transaction. On recovery we start > processing > > > from last committed window (don't exclude last committed window, as it > > > could be partially written). If the queries are update or insert > queries > > > using primary key, it shouldn't be a problem if we reply insert/update > > > command. It will have same effect on database (of course not applicable > > for > > > all usecases). Does this look better? > > > > > > -Priyanka > > > > > > On Tue, Dec 29, 2015 at 11:31 AM, Chandni Singh < > [email protected] > > > > > > wrote: > > > > > > > Yeah I understand there is a problem that app window size is time > based > > > > here not number of events based. However I don't think having a max > > batch > > > > size in this class will help because that causes problems with saving > > the > > > > tuples exactly once and idempotency. > > > > > > > > I was just trying to let you know why the batch transactional store > is > > > how > > > > it is. > > > > > > > > Checkout the non-transactional store output operator > > > > (AbstractStoreOutputOperator) and its implementations where window id > > is > > > > saved with each update. I think having a batch extension of that can > > > > achieve what is needed here in a way that the operator will still be > > > > fault-tolerant and idempotent. > > > > > > > > Thanks, > > > > Chandni > > > > > > > > On Mon, Dec 28, 2015 at 9:45 PM, Chinmay Kolhatkar < > > > > [email protected]> > > > > wrote: > > > > > > > > > Hi Chandni, > > > > > > > > > > I totally agree with you that the transactions should be > idempotent. > > > And > > > > > that needs to be taken care of if the batch size is configurable. > > > > > > > > > > Though, I have a question related to the second part where batch > size > > > is > > > > > controlled by by controlling app window size. > > > > > I agree with you that aggregation window is a unit of aggregation > > > > provided > > > > > by platform. But, if I understand correctly, that is time based. > > > > > If I want to aggregate based on number of tuples, would this be > > > suitable? > > > > > > > > > > To give you an example, lets say I have a store on which the > > > transaction > > > > > size should never exceed 1000 operations. > > > > > And as a streaming application, it would be difficult to guess what > > > would > > > > > be the input rate, hence its not possible to guess how many tuples > > will > > > > > become part of a single application window. In such case, how can > the > > > > > application window size can be used to configure transaction batch > > > size? > > > > > Wouldn't it make more sense to have the control via exact number of > > > > tuples? > > > > > > > > > > Thanks, > > > > > Chinmay. > > > > > > > > > > > > > > > ~ Chinmay. > > > > > > > > > > On Tue, Dec 29, 2015 at 12:13 AM, Chandni Singh < > > > [email protected] > > > > > > > > > > wrote: > > > > > > > > > > > Hey Chinmay/Priyanka, > > > > > > > > > > > > We need to write tuples exactly once in the store. Please address > > the > > > > > > failure scenarios on how to achieve exactly once and > idempotency. I > > > > > > mentioned in my previous mail why multiple batches in a window > is a > > > > > problem > > > > > > with exactly once. > > > > > > > > > > > > Control via app window would mean, tuning the functionality by > > > > > controlling > > > > > > the platform params. I think it's best one gets option to > seperate > > > the > > > > > > concerns of platform and that of app logic. > > > > > > > > > > > > Application window is a unit of aggregation. Every operator in a > > DAG > > > > can > > > > > > have different application window which is the support platform > > > > provides > > > > > > for application logic. > > > > > > > > > > > > Chandni > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Dec 28, 2015 at 10:35 AM, Chinmay Kolhatkar < > > > > > > [email protected] > > > > > > > wrote: > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > Just a thought on how it can possibly be done. > > > > > > > > > > > > > > The pseudo code might look like this: > > > > > > > > > > > > > > processTuple() > > > > > > > { > > > > > > > If(batchSize < configuredBatchSize){ > > > > > > > //add to the batch > > > > > > > } > > > > > > > Else { > > > > > > > // process the batch as a transaction > > > > > > > // empty the data structure of batch. > > > > > > > } > > > > > > > } > > > > > > > > > > > > > > endWindow() > > > > > > > { > > > > > > > // process the batch as transaction. > > > > > > > // empty the data structure of batch. > > > > > > > } > > > > > > > > > > > > > > This way, user can get better/direct control over what > > transaction > > > > > means. > > > > > > > > > > > > > > As chandni rightly said, one can reduce the application window > > size > > > > for > > > > > > the > > > > > > > operator, and that would reduce the batch size. But that's not > > > > > something > > > > > > > which looks intuitive from user's perspective. > > > > > > > Control via app window would mean, tuning the functionality by > > > > > > controlling > > > > > > > the platform params. I think it's best one gets option to > > seperate > > > > the > > > > > > > concerns of platform and that of app logic. > > > > > > > > > > > > > > If one wants to control the batch size, he/she should be able > to > > do > > > > > that > > > > > > by > > > > > > > just setting the property of batch size(a number), and not by > > > > changing > > > > > > app > > > > > > > window size (an indirect time unit). > > > > > > > > > > > > > > ~ Chinmay > > > > > > > On 28 Dec 2015 22:53, "Chandni Singh" <[email protected] > > > > > > wrote: > > > > > > > > > > > > > > > But you will not allow multiple batches in the same window? > > > > > > > > Can you please elaborate on failure scenarios and how it > > affects > > > > > > > > idempotency. > > > > > > > > > > > > > > > > Chandni > > > > > > > > > > > > > > > > On Mon, Dec 28, 2015 at 2:32 AM, Priyanka Gugale < > > > > > > > [email protected] > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > Sorry if I was not clear, but I am trying to propose the > > > MAX_SIZE > > > > > per > > > > > > > > > window which the operator could process. The size could be > > less > > > > > than > > > > > > > the > > > > > > > > > MAX_SIZE, no restriction about that. > > > > > > > > > > > > > > > > > > -Priyanka > > > > > > > > > > > > > > > > > > On Mon, Dec 28, 2015 at 3:22 PM, Chandni Singh < > > > > > > > [email protected]> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > How do you propose to to restrict the no. of tuples > > processed > > > > in > > > > > an > > > > > > > > > > application window < batch size. > > > > > > > > > > > > > > > > > > > > I don't see a way to enforce that batch size can never be > > > less > > > > > > tuples > > > > > > > > > > processed in an application window. > > > > > > > > > > > > > > > > > > > > On Mon, Dec 28, 2015 at 1:25 AM, Priyanka Gugale < > > > > > > [email protected]> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi Chandni, > > > > > > > > > > > > > > > > > > > > > > How about restricting tuples which can be processed per > > > > window. > > > > > > If > > > > > > > > > > someone > > > > > > > > > > > wants to process small and frequent batches, he can set > > > batch > > > > > > size > > > > > > > to > > > > > > > > > > some > > > > > > > > > > > small value and also reduce the window size. This would > > > build > > > > > > some > > > > > > > > back > > > > > > > > > > > pressure of course. But that could be acceptable if one > > > > really > > > > > > want > > > > > > > > to > > > > > > > > > > > restrict batch size. > > > > > > > > > > > The though was triggered while working on Cassandra > > output > > > > > > > operator. > > > > > > > > > > > Cassandra creates problem in processing batches of size > > > > greater > > > > > > > than > > > > > > > > > some > > > > > > > > > > > value (don't recall exact number right now). Other > > > databases > > > > > may > > > > > > > want > > > > > > > > > to > > > > > > > > > > > restrict the batch size for similar or other reasons. > > > > > > > > > > > > > > > > > > > > > > -Priyanka > > > > > > > > > > > > > > > > > > > > > > On Mon, Dec 28, 2015 at 2:46 PM, Chandni Singh < > > > > > > > > > [email protected]> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Priyanka, > > > > > > > > > > > > > > > > > > > > > > > > AbstractBatchTransactionableStore assumes all tuples > in > > > one > > > > > > > > > application > > > > > > > > > > > as > > > > > > > > > > > > a batch because it needs to store the tuples in the > > store > > > > > > > > > exactly-once. > > > > > > > > > > > > > > > > > > > > > > > > If there is more than one batch in an application > > window, > > > > > then > > > > > > to > > > > > > > > > store > > > > > > > > > > > the > > > > > > > > > > > > tuples exactly once the window Id needs to be written > > > with > > > > > > every > > > > > > > > > tuple > > > > > > > > > > as > > > > > > > > > > > > well which is not that efficient. Therefore we take > > > > advantage > > > > > > of > > > > > > > > the > > > > > > > > > > > > transaction support by saving just the window id once > > > (not > > > > > with > > > > > > > > every > > > > > > > > > > > > tuple) but this necessitates all the tuples to be > > > > considered > > > > > > as a > > > > > > > > > > batch. > > > > > > > > > > > > > > > > > > > > > > > > Every operator in a DAG can have its own application > > > window > > > > > > size. > > > > > > > > So > > > > > > > > > to > > > > > > > > > > > > reduce the size per batch, the application window > > > attribute > > > > > > needs > > > > > > > > to > > > > > > > > > be > > > > > > > > > > > > modified. > > > > > > > > > > > > > > > > > > > > > > > > Chandni > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Dec 28, 2015 at 1:01 AM, Chinmay Kolhatkar < > > > > > > > > > > > > [email protected]> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > +1 for this. > > > > > > > > > > > > > > > > > > > > > > > > > > ~ Chinmay. > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Dec 28, 2015 at 2:27 PM, Priyanka Gugale < > > > > > > > > > [email protected]> > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > > > > > > > > > > > In Malhar we have an > > > > > > > > > > > > > > operator > > > > AbstractBatchTransactionableStoreOutputOperator > > > > > > > which > > > > > > > > > > > creates > > > > > > > > > > > > > > batches based on tuples received in a window. At > > the > > > > end > > > > > of > > > > > > > the > > > > > > > > > > > window > > > > > > > > > > > > > > these batches are sent to database for > processing. > > > > > > > > > > > > > > There is no way to configure MAX_SIZE on these > > > batches. > > > > > > Based > > > > > > > > on > > > > > > > > > > > input > > > > > > > > > > > > > rate > > > > > > > > > > > > > > the batch sizes can grow very high, and we might > > want > > > > to > > > > > > > > restrict > > > > > > > > > > > batch > > > > > > > > > > > > > > size. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Any operator can extend and do batch management > on > > > > their > > > > > > own, > > > > > > > > > but I > > > > > > > > > > > see > > > > > > > > > > > > > it > > > > > > > > > > > > > > as generic requirement and IMO we should change > > base > > > > > class > > > > > > > i.e. > > > > > > > > > > > > > > AbstractBatchTransactionableStoreOutputOperator > > class > > > > to > > > > > > > accept > > > > > > > > > > > > MAX_SIZE > > > > > > > > > > > > > > for batch from outside. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Any opinion on this? > > > > > > > > > > > > > > > > > > > > > > > > > > > > -Priyanka > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
