Chandni: Good point on ordering.

We need to handle the case. Even the ordering is not guaranteed when
upstream operator has multiple instances.

Regards,
Sandeep

On Tue, Dec 29, 2015 at 8:39 PM, Chandni Singh <[email protected]>
wrote:

> I agree with Sandeep that at-least once with databases is not the right
> approach. All the output adaptors we have in the library are written so
> that they do not write duplicate entries. For example FileOutputOperator.
> This has nothing to do with the processing mode feature offered by the
> platform. When writing to any external entity we have to ensure that we do
> not introduce duplicates.
>
> Priyanka,
> I am assuming that the batch id you mean here is the number of tuples in
> the app window which are persisted otherwise with every tuple there needs
> to be a batch id so when a window is being replayed you will know which
> tuple to discard.
>
> We have followed that approach in some concrete implementations in POCs.
> One of the reasons that we haven't added that here is that it relies on an
> order of tuples within an application window. This again depends on the
> upstream operator. For eg. in the default mode of dedup, the ordering of
> tuples within an application window is not guaranteed.
>
> Chandni
>
>
>
>
> On Tue, Dec 29, 2015 at 4:53 AM, Priyanka Gugale <[email protected]
> >
> wrote:
>
> > One more option:
> >
> > We can keep track of windowId & batchId i.e. we save batchId with
> windowId
> > when we commit a batch within a window. e.g. we are in window 10 and we
> > have written second batch in window 10, we commit windowId=10 and
> batchId=2
> > to DB. While recovery we won't process batches within last window which
> are
> > marked as committed.
> >
> > -Priyanka
> >
> > On Tue, Dec 29, 2015 at 3:54 PM, Sandeep Deshmukh <
> [email protected]
> > >
> > wrote:
> >
> > > Not sure if "At least once" is right behavior for databases. We may not
> > > always have primary key to update or insert.
> > >
> > >
> > > Regards,
> > > Sandeep
> > >
> > > On Tue, Dec 29, 2015 at 2:23 PM, Priyanka Gugale <[email protected]>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > Thanks for your inputs Chandni. I guess what you are suggesting is
> > > similar
> > > > to AbstractJdbcNonTransactionableBatchOutputOperator which is batch
> non
> > > > transactional operation. That is one of the good option.
> > > >
> > > > I am also thinking of a possibility of having "At least once"
> behavior
> > > with
> > > > Transactional store. In this, we keep committing batches within a
> > window.
> > > > Each batch commit will be  a transaction. On recovery we start
> > processing
> > > > from last committed window (don't exclude last committed window, as
> it
> > > > could be partially written). If the queries are update or insert
> > queries
> > > > using primary key, it shouldn't  be a problem if we reply
> insert/update
> > > > command. It will have same effect on database (of course not
> applicable
> > > for
> > > > all usecases). Does this look better?
> > > >
> > > > -Priyanka
> > > >
> > > > On Tue, Dec 29, 2015 at 11:31 AM, Chandni Singh <
> > [email protected]
> > > >
> > > > wrote:
> > > >
> > > > > Yeah I understand there is a problem that app window size is time
> > based
> > > > > here not number of events based. However I don't think having a max
> > > batch
> > > > > size in this class will help because that causes problems with
> saving
> > > the
> > > > > tuples exactly once and idempotency.
> > > > >
> > > > > I was just trying to let you know why the batch transactional store
> > is
> > > > how
> > > > > it is.
> > > > >
> > > > > Checkout the non-transactional store output  operator
> > > > > (AbstractStoreOutputOperator) and its implementations where window
> id
> > > is
> > > > > saved with each update. I think having a batch extension of that
> can
> > > > > achieve what is needed here in a way that the operator will still
> be
> > > > > fault-tolerant and idempotent.
> > > > >
> > > > > Thanks,
> > > > > Chandni
> > > > >
> > > > > On Mon, Dec 28, 2015 at 9:45 PM, Chinmay Kolhatkar <
> > > > > [email protected]>
> > > > > wrote:
> > > > >
> > > > > > Hi Chandni,
> > > > > >
> > > > > > I totally agree with you that the transactions should be
> > idempotent.
> > > > And
> > > > > > that needs to be taken care of if the batch size is configurable.
> > > > > >
> > > > > > Though, I have a question related to the second part where batch
> > size
> > > > is
> > > > > > controlled by by controlling app window size.
> > > > > > I agree with you that aggregation window is a unit of aggregation
> > > > > provided
> > > > > > by platform. But, if I understand correctly, that is time based.
> > > > > > If I want to aggregate based on number of tuples, would this be
> > > > suitable?
> > > > > >
> > > > > > To give you an example, lets say I have a store on which the
> > > > transaction
> > > > > > size should never exceed 1000 operations.
> > > > > > And as a streaming application, it would be difficult to guess
> what
> > > > would
> > > > > > be the input rate, hence its not possible to guess how many
> tuples
> > > will
> > > > > > become part of a single application window. In such case, how can
> > the
> > > > > > application window size can be used to configure transaction
> batch
> > > > size?
> > > > > > Wouldn't it make more sense to have the control via exact number
> of
> > > > > tuples?
> > > > > >
> > > > > > Thanks,
> > > > > > Chinmay.
> > > > > >
> > > > > >
> > > > > > ~ Chinmay.
> > > > > >
> > > > > > On Tue, Dec 29, 2015 at 12:13 AM, Chandni Singh <
> > > > [email protected]
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hey Chinmay/Priyanka,
> > > > > > >
> > > > > > > We need to write tuples exactly once in the store. Please
> address
> > > the
> > > > > > > failure scenarios on how to achieve exactly once and
> > idempotency. I
> > > > > > > mentioned in my previous mail why multiple batches in a window
> > is a
> > > > > > problem
> > > > > > > with exactly once.
> > > > > > >
> > > > > > > Control via app window would mean, tuning the functionality by
> > > > > > controlling
> > > > > > > the platform params. I think it's best one gets option to
> > seperate
> > > > the
> > > > > > > concerns of platform and that of app logic.
> > > > > > >
> > > > > > > Application window is a unit of aggregation. Every operator in
> a
> > > DAG
> > > > > can
> > > > > > > have different application window which is the support platform
> > > > > provides
> > > > > > > for application logic.
> > > > > > >
> > > > > > > Chandni
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Dec 28, 2015 at 10:35 AM, Chinmay Kolhatkar <
> > > > > > > [email protected]
> > > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > Just a thought on how it can possibly be done.
> > > > > > > >
> > > > > > > > The pseudo code might look like this:
> > > > > > > >
> > > > > > > > processTuple()
> > > > > > > > {
> > > > > > > > If(batchSize < configuredBatchSize){
> > > > > > > >    //add to the batch
> > > > > > > > }
> > > > > > > > Else {
> > > > > > > >   // process the batch as a transaction
> > > > > > > >   // empty the data structure of batch.
> > > > > > > > }
> > > > > > > > }
> > > > > > > >
> > > > > > > > endWindow()
> > > > > > > > {
> > > > > > > > // process the batch as transaction.
> > > > > > > > // empty the data structure of batch.
> > > > > > > > }
> > > > > > > >
> > > > > > > > This way, user can get better/direct control over what
> > > transaction
> > > > > > means.
> > > > > > > >
> > > > > > > > As chandni rightly said, one can reduce the application
> window
> > > size
> > > > > for
> > > > > > > the
> > > > > > > > operator, and that would reduce the batch size. But that's
> not
> > > > > > something
> > > > > > > > which looks intuitive from user's perspective.
> > > > > > > > Control via app window would mean, tuning the functionality
> by
> > > > > > > controlling
> > > > > > > > the platform params. I think it's best one gets option to
> > > seperate
> > > > > the
> > > > > > > > concerns of platform and that of app logic.
> > > > > > > >
> > > > > > > > If one wants to control the batch size, he/she should be able
> > to
> > > do
> > > > > > that
> > > > > > > by
> > > > > > > > just setting the property of batch size(a number), and not by
> > > > > changing
> > > > > > > app
> > > > > > > > window size (an indirect time unit).
> > > > > > > >
> > > > > > > > ~ Chinmay
> > > > > > > > On 28 Dec 2015 22:53, "Chandni Singh" <
> [email protected]
> > >
> > > > > wrote:
> > > > > > > >
> > > > > > > > > But you will not allow multiple batches in the same window?
> > > > > > > > > Can you please elaborate on failure scenarios and how it
> > > affects
> > > > > > > > > idempotency.
> > > > > > > > >
> > > > > > > > > Chandni
> > > > > > > > >
> > > > > > > > > On Mon, Dec 28, 2015 at 2:32 AM, Priyanka Gugale <
> > > > > > > > [email protected]
> > > > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi,
> > > > > > > > > >
> > > > > > > > > > Sorry if I was not clear, but I am trying to propose the
> > > > MAX_SIZE
> > > > > > per
> > > > > > > > > > window which the operator could process. The size could
> be
> > > less
> > > > > > than
> > > > > > > > the
> > > > > > > > > > MAX_SIZE, no restriction about that.
> > > > > > > > > >
> > > > > > > > > > -Priyanka
> > > > > > > > > >
> > > > > > > > > > On Mon, Dec 28, 2015 at 3:22 PM, Chandni Singh <
> > > > > > > > [email protected]>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > How do you propose to to restrict the no. of tuples
> > > processed
> > > > > in
> > > > > > an
> > > > > > > > > > > application window < batch size.
> > > > > > > > > > >
> > > > > > > > > > > I don't see a way to enforce that batch size can never
> be
> > > > less
> > > > > > > tuples
> > > > > > > > > > > processed in an application window.
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Dec 28, 2015 at 1:25 AM, Priyanka Gugale <
> > > > > > > [email protected]>
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Chandni,
> > > > > > > > > > > >
> > > > > > > > > > > > How about restricting tuples which can be processed
> per
> > > > > window.
> > > > > > > If
> > > > > > > > > > > someone
> > > > > > > > > > > > wants to process small and frequent batches, he can
> set
> > > > batch
> > > > > > > size
> > > > > > > > to
> > > > > > > > > > > some
> > > > > > > > > > > > small value and also reduce the window size. This
> would
> > > > build
> > > > > > > some
> > > > > > > > > back
> > > > > > > > > > > > pressure of course. But that could be acceptable if
> one
> > > > > really
> > > > > > > want
> > > > > > > > > to
> > > > > > > > > > > > restrict batch size.
> > > > > > > > > > > > The though was triggered while working on Cassandra
> > > output
> > > > > > > > operator.
> > > > > > > > > > > > Cassandra creates problem in processing batches of
> size
> > > > > greater
> > > > > > > > than
> > > > > > > > > > some
> > > > > > > > > > > > value (don't recall exact number right now). Other
> > > > databases
> > > > > > may
> > > > > > > > want
> > > > > > > > > > to
> > > > > > > > > > > > restrict the batch size for similar or other reasons.
> > > > > > > > > > > >
> > > > > > > > > > > > -Priyanka
> > > > > > > > > > > >
> > > > > > > > > > > > On Mon, Dec 28, 2015 at 2:46 PM, Chandni Singh <
> > > > > > > > > > [email protected]>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Priyanka,
> > > > > > > > > > > > >
> > > > > > > > > > > > > AbstractBatchTransactionableStore assumes all
> tuples
> > in
> > > > one
> > > > > > > > > > application
> > > > > > > > > > > > as
> > > > > > > > > > > > > a batch because it needs to store the tuples in the
> > > store
> > > > > > > > > > exactly-once.
> > > > > > > > > > > > >
> > > > > > > > > > > > > If there is more than one batch in an application
> > > window,
> > > > > > then
> > > > > > > to
> > > > > > > > > > store
> > > > > > > > > > > > the
> > > > > > > > > > > > > tuples exactly once the window Id needs to be
> written
> > > > with
> > > > > > > every
> > > > > > > > > > tuple
> > > > > > > > > > > as
> > > > > > > > > > > > > well which is not that efficient. Therefore we take
> > > > > advantage
> > > > > > > of
> > > > > > > > > the
> > > > > > > > > > > > > transaction support by saving just the window id
> once
> > > > (not
> > > > > > with
> > > > > > > > > every
> > > > > > > > > > > > > tuple) but this necessitates all the tuples to be
> > > > > considered
> > > > > > > as a
> > > > > > > > > > > batch.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Every operator in a DAG can have its own
> application
> > > > window
> > > > > > > size.
> > > > > > > > > So
> > > > > > > > > > to
> > > > > > > > > > > > > reduce the size per batch, the application window
> > > > attribute
> > > > > > > needs
> > > > > > > > > to
> > > > > > > > > > be
> > > > > > > > > > > > > modified.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Chandni
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Mon, Dec 28, 2015 at 1:01 AM, Chinmay Kolhatkar
> <
> > > > > > > > > > > > > [email protected]>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > +1 for this.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > ~ Chinmay.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Mon, Dec 28, 2015 at 2:27 PM, Priyanka Gugale
> <
> > > > > > > > > > [email protected]>
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > In Malhar we have an
> > > > > > > > > > > > > > > operator
> > > > > AbstractBatchTransactionableStoreOutputOperator
> > > > > > > > which
> > > > > > > > > > > > creates
> > > > > > > > > > > > > > > batches based on tuples received in a window.
> At
> > > the
> > > > > end
> > > > > > of
> > > > > > > > the
> > > > > > > > > > > > window
> > > > > > > > > > > > > > > these batches are sent to database for
> > processing.
> > > > > > > > > > > > > > > There is no way to configure MAX_SIZE on these
> > > > batches.
> > > > > > > Based
> > > > > > > > > on
> > > > > > > > > > > > input
> > > > > > > > > > > > > > rate
> > > > > > > > > > > > > > > the batch sizes can grow very high, and we
> might
> > > want
> > > > > to
> > > > > > > > > restrict
> > > > > > > > > > > > batch
> > > > > > > > > > > > > > > size.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Any operator can extend and do batch management
> > on
> > > > > their
> > > > > > > own,
> > > > > > > > > > but I
> > > > > > > > > > > > see
> > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > as generic requirement and IMO we should change
> > > base
> > > > > > class
> > > > > > > > i.e.
> > > > > > > > > > > > > > > AbstractBatchTransactionableStoreOutputOperator
> > > class
> > > > > to
> > > > > > > > accept
> > > > > > > > > > > > > MAX_SIZE
> > > > > > > > > > > > > > > for batch from outside.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Any opinion on this?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > -Priyanka
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to