Agree with Tim and Chandni that we should go to disk only when output DB is not reachable or slow. As suggested the best approach will be to use combination of AbstractReconsiler and WAL (spill to disk only when in memory queue size is reached).
I can take it up to integrate enhanced reconsiler with DB output operator. Also can help in using WAL with AbstractReconsiler. -Priyanka On Fri, Dec 18, 2015 at 4:06 AM, Ashwin Chandra Putta < [email protected]> wrote: > I will send a PR for my first implementation soon. > > On Thu, Dec 17, 2015 at 2:34 PM, Timothy Farkas <[email protected]> > wrote: > > > It looks like Ashwin has an initial implementation of a reconciler. Could > > we add that to Malhar and add WAL optimizations to it once the WAL is > added > > to Malhar? > > > > On Thu, Dec 17, 2015 at 1:31 PM, Chandni Singh <[email protected]> > > wrote: > > > > > Pramod, > > > > > > Agreed it can be done using the reconciler and optimizing it but that > > means > > > there is some work to be done in Malhar/library. We have a ticket now > to > > > address that work. > > > > > > Using WAL to spool the tuples is all missing from Malhar/lib which > means > > > the user needs to write more code. > > > > > > Thanks, > > > Chandni > > > > > > On Thu, Dec 17, 2015 at 1:07 PM, Ashwin Chandra Putta < > > > [email protected]> wrote: > > > > > > > Tim, > > > > > > > > I don't think there is an implementation in Malhar yet. I have an > > > > implementation in my fork that I sent you. > > > > > > > > Regards, > > > > Ashwin. > > > > > > > > On Thu, Dec 17, 2015 at 12:09 PM, Timothy Farkas < > [email protected]> > > > > wrote: > > > > > > > > > Ashwin is there an implementation of that in Malhar? I could only > > find > > > an > > > > > in memory only version: > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/incubator-apex-malhar/blob/devel-3/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java > > > > > > > > > > This in memory implementation won't work in this use case since > > > committed > > > > > may not be called for hours or a day so data will be held in memory > > for > > > > > some time. > > > > > > > > > > On Thu, Dec 17, 2015 at 11:49 AM, Ashwin Chandra Putta < > > > > > [email protected]> wrote: > > > > > > > > > > > Tim, > > > > > > > > > > > > Are you saying HDFS is slower than a database? :) > > > > > > > > > > > > I think Reconciler is the best approach. The tuples need not be > > > written > > > > > to > > > > > > hdfs, they can be queued in memory. You can spool them to hdfs > only > > > > when > > > > > it > > > > > > reaches the limits of the queue. The reconciler solves a few > major > > > > > problems > > > > > > as you described above. > > > > > > > > > > > > 1. Graceful reconnection. When the external system we are writing > > to > > > is > > > > > > down, the reconciler is spooling the messages to the queue and > then > > > to > > > > > > hdfs. The tuples are written to the external system only after it > > is > > > > back > > > > > > up again. > > > > > > 2. Handling surges. There will be cases when the throughput may > > get a > > > > > > sudden surge for some period and the external system may not be > > fast > > > > > enough > > > > > > for the writes to it. In those cases, by using reconciler, we are > > > > > spooling > > > > > > the incoming tuples to queue/hdfs and then writing at the pace of > > > > > external > > > > > > system. > > > > > > 3. Dag slowdown. Again in case of external system failure or slow > > > > > > connection, we do not want to block the windows moving forward. > If > > > the > > > > > > windows are blocked for a long time, then stram will > unnecessarily > > > kill > > > > > the > > > > > > operator. Reconciler makes sure that the incoming messages are > just > > > > > > queued/spooled to hdfs (external system is not blocking the dag), > > so > > > > the > > > > > > dag is not slowed down. > > > > > > > > > > > > Regards, > > > > > > Ashwin. > > > > > > > > > > > > On Thu, Dec 17, 2015 at 11:29 AM, Timothy Farkas < > > > [email protected]> > > > > > > wrote: > > > > > > > > > > > > > Yes that is true Chandni, and considering how slow HDFS is we > > > should > > > > > > avoid > > > > > > > writing to it if we can. > > > > > > > > > > > > > > It would be great if someone could pick up the ticket :). > > > > > > > > > > > > > > On Thu, Dec 17, 2015 at 11:17 AM, Chandni Singh < > > > > > [email protected] > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > +1 for Tim's suggestion. > > > > > > > > > > > > > > > > Using reconciler employs always writing to HDFS and then read > > > from > > > > > > that. > > > > > > > > Tim's suggestion is that we only write to hdfs when database > > > > > connection > > > > > > > is > > > > > > > > down. This is analogous to spooling. > > > > > > > > > > > > > > > > Chandni > > > > > > > > > > > > > > > > On Thu, Dec 17, 2015 at 11:13 AM, Pramod Immaneni < > > > > > > > [email protected]> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Tim we have a pattern for this called Reconciler that > Gaurav > > > has > > > > > also > > > > > > > > > mentioned. There are some examples for it in Malhar > > > > > > > > > > > > > > > > > > On Thu, Dec 17, 2015 at 9:47 AM, Timothy Farkas < > > > > > [email protected] > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi All, > > > > > > > > > > > > > > > > > > > > One of our users is outputting to Cassandra, but they > want > > to > > > > > > handle > > > > > > > a > > > > > > > > > > Cassandra failure or Cassandra down time gracefully from > an > > > > > output > > > > > > > > > > operator. Currently a lot of our database operators will > > just > > > > > fail > > > > > > > and > > > > > > > > > > redeploy continually until the database comes back. This > > is a > > > > bad > > > > > > > idea > > > > > > > > > for > > > > > > > > > > a couple of reasons: > > > > > > > > > > > > > > > > > > > > 1 - We rely on buffer server spooling to prevent data > loss. > > > If > > > > > the > > > > > > > > > database > > > > > > > > > > is down for a long time (several hours or a day) we may > run > > > out > > > > > of > > > > > > > > space > > > > > > > > > to > > > > > > > > > > spool for buffer server since it spools to local disk, > and > > > data > > > > > is > > > > > > > > purged > > > > > > > > > > only after a window is committed. Furthermore this buffer > > > > server > > > > > > > > problem > > > > > > > > > > will exist for all the Streaming Containers in the dag, > not > > > > just > > > > > > the > > > > > > > > one > > > > > > > > > > immediately upstream from the output operator, since data > > is > > > > > > spooled > > > > > > > to > > > > > > > > > > disk for all operators and only removed for windows once > a > > > > window > > > > > > is > > > > > > > > > > committed. > > > > > > > > > > > > > > > > > > > > 2 - If there is another failure further upstream in the > > dag, > > > > > > upstream > > > > > > > > > > operators will be redeployed to a checkpoint less than or > > > equal > > > > > to > > > > > > > the > > > > > > > > > > checkpoint of the database operator in the At leas once > > case. > > > > > This > > > > > > > > could > > > > > > > > > > mean redoing several hours or a day worth of computation. > > > > > > > > > > > > > > > > > > > > We should support a mechanism to detect when the > connection > > > to > > > > a > > > > > > > > database > > > > > > > > > > is lost and then spool to hdfs using a WAL, and then > write > > > the > > > > > > > contents > > > > > > > > > of > > > > > > > > > > the WAL into the database once it comes back online. This > > > will > > > > > save > > > > > > > the > > > > > > > > > > local disk space of all the nodes used in the dag and > allow > > > it > > > > to > > > > > > be > > > > > > > > used > > > > > > > > > > for only the data being output to the output operator. > > > > > > > > > > > > > > > > > > > > Ticket here if anyone is interested in working on it: > > > > > > > > > > > > > > > > > > > > https://malhar.atlassian.net/browse/MLHR-1951 > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > Tim > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > Regards, > > > > > > Ashwin. > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > Regards, > > > > Ashwin. > > > > > > > > > > > > > -- > > Regards, > Ashwin. >
