Gaurav, Sandesh PFB my comments in *bold*
1. Are there standard APIs for distributed In-Memory stores or is this implementation specific to one particular tool? *I have developed concrete implementation with Apache Geode - http://geode.incubator.apache.org/ <http://geode.incubator.apache.org/>* *However for this feature contribution I am adding KeyValue store interface and abstract implementation to plug in any KeyValue store as storage agent. * 2. Will In-Memory Store compete with DataTorrent Apps for cluster resources (memory/cpu)? *Probable not, In-memory store would be separate managed cluster which may not part of yarn env. * 3. What is the purging policy? Who is responsible for cleaning up the resources for completed/failed/aborted applications? This becomes important when you want to launch an Application using previous Application Id *In-memory storage would support delete checkpoint which platform calls periodically d**uring application lifetime. * *Purging the checkpoints of older applications will be taken care by application developer or admin who is managing the in-memory cluster, same is the case with HDFS storage agents where user have to manually delete old apps and checkpoints data.* 4 What all in-memory store did you evaluate? Are they YARN compatible? *I have concrete implementation of Geode storage agent which I would be contributing along with this feature.* Thanks, Ashish On Fri, Dec 4, 2015 at 12:45 AM, Sandesh Hegde <[email protected]> wrote: > Ashish, > > Two more questions for you, > What all in-memory store did you evaluate? Are they YARN compatible? > > Thank you for your contribution. > > Sandesh > > On Wed, Dec 2, 2015 at 10:53 AM Gaurav Gupta <[email protected]> > wrote: > > > Ashish, > > > > I have couple of questions > > 1. Are there standard APIs for distributed In-Memory stores or is this > > implementation specific to one particular tool? > > 2. Will In-Memory Store compete with DataTorrent Apps for cluster > > resources (memory/cpu)? > > 3. What is the purging policy? Who is responsible for cleaning up the > > resources for completed/failed/aborted applications? This becomes > important > > when you want to launch an Application using previous Application Id > > > > Thanks > > - Gaurav > > > > > On Dec 2, 2015, at 10:07 AM, Ashish Tadose <[email protected]> > > wrote: > > > > > > Thanks Gaurav, > > > > > > I have finished baseline implementations of StorageAgent and also > tested > > it > > > with demo applications by explicitly specifying it in DAG configuration > > as > > > below and it works fine. > > > > > > dag.setAttribute(OperatorContext.STORAGE_AGENT, agent); > > > > > > I also had to make some changes to StramClient to pass additional > > > information such as applicationId as it doesn't passes currently. > > > > > > I am going to create JIRA task for this feature and will document > design > > & > > > implementation strategy there. > > > > > > Thx, > > > Asish > > > > > > > > > On Wed, Dec 2, 2015 at 11:26 PM, Gaurav Gupta <[email protected]> > > > wrote: > > > > > >> Just to add you can plugin your storage agent using attribute > > >> STORAGE_AGENT ( > > >> > > > https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Context.OperatorContext.html#STORAGE_AGENT > > >> ) > > >> > > >> Thanks > > >> - Gaurav > > >> > > >>> On Dec 2, 2015, at 9:51 AM, Gaurav Gupta <[email protected]> > > wrote: > > >>> > > >>> Ashish, > > >>> > > >>> You are right that Exactly once semantics can’t be achieved through > > >> Async FS write. > > >>> Did you try new StorageAgent with your Application? If yes do you > have > > >> any numbers to compare? > > >>> > > >>> Thanks > > >>> - Gaurav > > >>> > > >>>> On Dec 2, 2015, at 9:33 AM, Ashish Tadose <[email protected] > > >> <mailto:[email protected]>> wrote: > > >>>> > > >>>> Application uses large number of in-memory dimension store > partitions > > to > > >>>> hold high cardinally aggregated data and also many intermediate > > >> operators > > >>>> keep cache data for reference look ups which are not-transient. > > >>>> > > >>>> Total application partitions were more than 1000 which makes lot of > > >>>> operator to checkpoint and in term lot of frequent Hdfs write, > rename > > & > > >>>> delete operations which became bottleneck. > > >>>> > > >>>> Application requires Exactly once semantics with idempotent > operators > > >> which > > >>>> I suppose can not be achieved through Async fs writes, please > correct > > >> me If > > >>>> I'm wrong here. > > >>>> > > >>>> Also application computes streaming aggregations of high cardinality > > >>>> incoming data streams and reference caches are update frequently so > > not > > >>>> sure how much incremental checkpointing will help here. > > >>>> > > >>>> Despite this specific application I strongly think it would be good > to > > >> have > > >>>> StorageAgent backed by distributed in-memory store as alternative in > > >>>> platform. > > >>>> > > >>>> Ashish > > >>>> > > >>>> > > >>>> > > >>>> On Wed, Dec 2, 2015 at 10:35 PM, Munagala Ramanath < > > [email protected] > > >> <mailto:[email protected]>> > > >>>> wrote: > > >>>> > > >>>>> Ashish, > > >>>>> > > >>>>> In the current release, the HDFS writes are asynchronous so I'm > > >> wondering > > >>>>> if > > >>>>> you could elaborate on how much latency you are observing both with > > and > > >>>>> without > > >>>>> checkpointing (i.e. after your changes to make operators > stateless). > > >>>>> > > >>>>> Also any information on how much non-transient data is being > > >> checkpointed > > >>>>> in > > >>>>> each operator would also be useful. There is an effort under way to > > >>>>> implement > > >>>>> incremental checkpointing which should improve things when there > is a > > >> lot > > >>>>> state > > >>>>> but very little that changes from window to window. > > >>>>> > > >>>>> Ram > > >>>>> > > >>>>> > > >>>>> On Wed, Dec 2, 2015 at 8:51 AM, Ashish Tadose < > > [email protected] > > >> <mailto:[email protected]>> > > >>>>> wrote: > > >>>>> > > >>>>>> Hi All, > > >>>>>> > > >>>>>> Currently Apex engine provides operator checkpointing in Hdfs ( > with > > >> Hdfs > > >>>>>> backed StorageAgents i.e. FSStorageAgent & AsyncFSStorageAgent ) > > >>>>>> > > >>>>>> We have observed that for applications having large number of > > operator > > >>>>>> instances, hdfs checkpointing introduces latency in DAG which > > degrades > > >>>>>> overall application performance. > > >>>>>> To resolve this we had to review all operators in DAG and had to > > make > > >> few > > >>>>>> operators stateless. > > >>>>>> > > >>>>>> As operator check-pointing is critical functionality of Apex > > streaming > > >>>>>> platform to ensure fault tolerant behavior, platform should also > > >> provide > > >>>>>> alternate StorageAgents which will work seamlessly with large > > >>>>> applications > > >>>>>> that requires Exactly once semantics. > > >>>>>> > > >>>>>> HDFS read/write latency is limited and doesn't improve beyond > > certain > > >>>>> point > > >>>>>> because of disk io & staging writes. Having alternate strategy to > > this > > >>>>>> check-pointing in fault tolerant distributed in-memory grid would > > >> ensure > > >>>>>> application stability and performance is not impacted. > > >>>>>> > > >>>>>> I have developed a in-memory storage agent which I would like to > > >>>>> contribute > > >>>>>> as alternate StorageAgent for checkpointing. > > >>>>>> > > >>>>>> Thanks, > > >>>>>> Ashish > > >>>>>> > > >>>>> > > >>> > > >> > > >> > > > > >
