Pramod, Agreed it can be done using the reconciler and optimizing it but that means there is some work to be done in Malhar/library. We have a ticket now to address that work.
Using WAL to spool the tuples is all missing from Malhar/lib which means the user needs to write more code. Thanks, Chandni On Thu, Dec 17, 2015 at 1:07 PM, Ashwin Chandra Putta < [email protected]> wrote: > Tim, > > I don't think there is an implementation in Malhar yet. I have an > implementation in my fork that I sent you. > > Regards, > Ashwin. > > On Thu, Dec 17, 2015 at 12:09 PM, Timothy Farkas <[email protected]> > wrote: > > > Ashwin is there an implementation of that in Malhar? I could only find an > > in memory only version: > > > > > > > https://github.com/apache/incubator-apex-malhar/blob/devel-3/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java > > > > This in memory implementation won't work in this use case since committed > > may not be called for hours or a day so data will be held in memory for > > some time. > > > > On Thu, Dec 17, 2015 at 11:49 AM, Ashwin Chandra Putta < > > [email protected]> wrote: > > > > > Tim, > > > > > > Are you saying HDFS is slower than a database? :) > > > > > > I think Reconciler is the best approach. The tuples need not be written > > to > > > hdfs, they can be queued in memory. You can spool them to hdfs only > when > > it > > > reaches the limits of the queue. The reconciler solves a few major > > problems > > > as you described above. > > > > > > 1. Graceful reconnection. When the external system we are writing to is > > > down, the reconciler is spooling the messages to the queue and then to > > > hdfs. The tuples are written to the external system only after it is > back > > > up again. > > > 2. Handling surges. There will be cases when the throughput may get a > > > sudden surge for some period and the external system may not be fast > > enough > > > for the writes to it. In those cases, by using reconciler, we are > > spooling > > > the incoming tuples to queue/hdfs and then writing at the pace of > > external > > > system. > > > 3. Dag slowdown. Again in case of external system failure or slow > > > connection, we do not want to block the windows moving forward. If the > > > windows are blocked for a long time, then stram will unnecessarily kill > > the > > > operator. Reconciler makes sure that the incoming messages are just > > > queued/spooled to hdfs (external system is not blocking the dag), so > the > > > dag is not slowed down. > > > > > > Regards, > > > Ashwin. > > > > > > On Thu, Dec 17, 2015 at 11:29 AM, Timothy Farkas <[email protected]> > > > wrote: > > > > > > > Yes that is true Chandni, and considering how slow HDFS is we should > > > avoid > > > > writing to it if we can. > > > > > > > > It would be great if someone could pick up the ticket :). > > > > > > > > On Thu, Dec 17, 2015 at 11:17 AM, Chandni Singh < > > [email protected] > > > > > > > > wrote: > > > > > > > > > +1 for Tim's suggestion. > > > > > > > > > > Using reconciler employs always writing to HDFS and then read from > > > that. > > > > > Tim's suggestion is that we only write to hdfs when database > > connection > > > > is > > > > > down. This is analogous to spooling. > > > > > > > > > > Chandni > > > > > > > > > > On Thu, Dec 17, 2015 at 11:13 AM, Pramod Immaneni < > > > > [email protected]> > > > > > wrote: > > > > > > > > > > > Tim we have a pattern for this called Reconciler that Gaurav has > > also > > > > > > mentioned. There are some examples for it in Malhar > > > > > > > > > > > > On Thu, Dec 17, 2015 at 9:47 AM, Timothy Farkas < > > [email protected] > > > > > > > > > > wrote: > > > > > > > > > > > > > Hi All, > > > > > > > > > > > > > > One of our users is outputting to Cassandra, but they want to > > > handle > > > > a > > > > > > > Cassandra failure or Cassandra down time gracefully from an > > output > > > > > > > operator. Currently a lot of our database operators will just > > fail > > > > and > > > > > > > redeploy continually until the database comes back. This is a > bad > > > > idea > > > > > > for > > > > > > > a couple of reasons: > > > > > > > > > > > > > > 1 - We rely on buffer server spooling to prevent data loss. If > > the > > > > > > database > > > > > > > is down for a long time (several hours or a day) we may run out > > of > > > > > space > > > > > > to > > > > > > > spool for buffer server since it spools to local disk, and data > > is > > > > > purged > > > > > > > only after a window is committed. Furthermore this buffer > server > > > > > problem > > > > > > > will exist for all the Streaming Containers in the dag, not > just > > > the > > > > > one > > > > > > > immediately upstream from the output operator, since data is > > > spooled > > > > to > > > > > > > disk for all operators and only removed for windows once a > window > > > is > > > > > > > committed. > > > > > > > > > > > > > > 2 - If there is another failure further upstream in the dag, > > > upstream > > > > > > > operators will be redeployed to a checkpoint less than or equal > > to > > > > the > > > > > > > checkpoint of the database operator in the At leas once case. > > This > > > > > could > > > > > > > mean redoing several hours or a day worth of computation. > > > > > > > > > > > > > > We should support a mechanism to detect when the connection to > a > > > > > database > > > > > > > is lost and then spool to hdfs using a WAL, and then write the > > > > contents > > > > > > of > > > > > > > the WAL into the database once it comes back online. This will > > save > > > > the > > > > > > > local disk space of all the nodes used in the dag and allow it > to > > > be > > > > > used > > > > > > > for only the data being output to the output operator. > > > > > > > > > > > > > > Ticket here if anyone is interested in working on it: > > > > > > > > > > > > > > https://malhar.atlassian.net/browse/MLHR-1951 > > > > > > > > > > > > > > Thanks, > > > > > > > Tim > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > Regards, > > > Ashwin. > > > > > > > > > -- > > Regards, > Ashwin. >
