That’s cool, I will look into those new features. But does that provide automatically snapshot and restore?
Thanks Edward On 12/11/15, 12:18, "P. Taylor Goetz" <[email protected]> wrote: >In Storm 1.0 (which we hope to release in the next month or so) adds >distributed cache/blobstore functionality that could be leveraged to >solve a lot of the problems described in this thread. Another relevant >feature is native windowing with persistent state (currently under >development). > >Documentation of these features is a little light, but I’ll try to >forward it on to this list when it’s more fully baked. > >-Taylor > >> On Dec 11, 2015, at 2:27 PM, Julian Hyde <[email protected]> wrote: >> >> State management of streams (including what I’d call “derived streams”) >>is a hard distributed systems problem. Ideally it would be solved by the >>stream provider, not by the Eagle project. I think you should talk to >>the various streaming projects ― Storm, Samza, Kafka, Flink ― and find >>out whether they can solve this, or whether it is on their roadmap. >> >> I can make introductions to the leaders of those projects if needed. >> >> If the problem is solved at source, Eagle can focus on the actual >>problem rather than infrastructure. >> >> Julian >> >> >>> On Dec 10, 2015, at 7:48 PM, Liangfei.Su <[email protected]> wrote: >>> >>> Great proposal, this is important and could be general served for >>>policy >>> capability and analytic feature. >>> >>> Periodically taken the snapshot independently on each bolt could make >>> status recoverable from recent history, but from whole topology store >>>point >>> of view, this could not hand bolt status dependency exactly. >>> >>> Another point is should the state restore be triggered not only when >>> topology restarts but also when >>> a. topology re-balance >>> b. single bolt movement by underling stream framework for one executor >>>to >>> another? >>> >>> Thanks, >>> Ralph >>> >>> >>> On Fri, Dec 11, 2015 at 9:49 AM, Zhang, Edward (GDI Hadoop) < >>> [email protected]> wrote: >>> >>>> This topic has been discussed offline for a while and it is time we >>>> document problems and solutions. With clear problem statements and >>>>proposed >>>> solutions, I believe we can do better before we start implementing. >>>> >>>> [Problem Statement] For Eagle as real-time big data monitoring >>>>framework >>>> evaluating policies efficiently is the core functionality. Most of >>>> meaningful polices are stateful in that policy evaluation is not >>>>based on a >>>> single event but on both previous events and current event. This >>>> potentially brings 2 fundamental problems, one is policy state loss >>>>upon >>>> machine failures or topology restart, the other is lacking history >>>>data >>>> upon fresh start. One simple example is for a policy like “from >>>> userActivity[cmd==‘delete’]time.window(1 month) select user, count() >>>>as cnt >>>> group by user having cnt > 1000”, if the task is restarted, the state >>>>of >>>> accumulated user/count map is missing. Also when the topology is >>>>started at >>>> the first time, the window is empty even if we have historic data in >>>> database. >>>> >>>> [Proposed Solutions] The natural way of solving the above 2 problems >>>>is >>>> 1) do policy state persist periodically and restore policy state after >>>> task is restarted >>>> Underlying policy engine should support snapshot and restore >>>>operations. >>>> In Siddhi 3.x, it already supports snapshot and restore, though I >>>>found >>>> some bugs with their state management. >>>> https://wso2.org/jira/browse/CEP-1433 >>>> For restore, it is not that straight-forward unless all input events >>>>to >>>> policy evaluator are backed by a reliable and rewindable storage like >>>>Kafka. >>>> If input events to policy evaluator is backed by Kafka, then each time >>>> when EAGLE takes snapshot, we records the current offset together and >>>> persist both of them to deep storage. >>>> If input events to policy evaluator is not backed by Kafka, then we >>>>need >>>> record every event since last snapshot. That looks very expensive. >>>>Apache >>>> Flink uses efficient algorithm called stream barrier to do group >>>> acknowledgement, but in Storm we don’t have this feature. But remember >>>> Apache Flink requires that each task do snapshot not only for policy >>>> evaluator. >>>> >>>> 2) transparently load historical data when topology is started at the >>>> first time >>>> If policy evaluator accepts data which is already persisted in >>>>database or >>>> Kafka, we can provide API to retrieve data from database or Kafka. >>>>This >>>> loading is transparent to developer, but developer/user needs to >>>>specify >>>> the deep storage for storing historical data. >>>> >>>> Also be aware that if we have the right solution for policy >>>>evaluator, the >>>> solution should be also applied to event aggregation. >>>> https://cwiki.apache.org/confluence/display/EAG/Stream+Analyze >>>> >>>> Another aggressive way is to use Flink stream barrier similar solution >>>> >>>>http://data-artisans.com/high-throughput-low-latency-and-exactly-once-s >>>>tream-processing-with-apache-flink/ >>>> to take snapshot to all eagle tasks(typically spout and bolt) but >>>>turn off >>>> storm ACK mechanism. >>>> >>>> trait StormStreamExecutor[R <: EagleTuple] extends >>>>FlatMapper[Seq[AnyRef], >>>> R] { >>>> def prepareConfig(config : Config) >>>> def init >>>> def fields : Array[String] >>>> } >>>> >>>> >>>> ==> >>>> >>>> >>>> trait StormStreamExecutor[R <: EagleTuple] extends >>>>FlatMapper[Seq[AnyRef], >>>> R] { >>>> def prepareConfig(config : Config) >>>> def init >>>> def fields : Array[String] >>>> >>>> def snapshot : Array[Byte] >>>> >>>> def restore(state: Array[Byte]) >>>> } >>>> >>>> This is pretty much important to Eagle if we want Eagle to be a >>>>monitoring >>>> framework with fault-tolerance. >>>> >>>> Thanks >>>> Edward >>>> From: Sriskandarajah Suhothayan <[email protected]<mailto:[email protected]>> >>>> Date: Thursday, December 10, 2015 at 9:30 >>>> To: "Zhang, Edward (GDI Hadoop)" <[email protected]<mailto: >>>> [email protected]>> >>>> Cc: >>>>"[email protected]<mailto:[email protected]>" >>>> >>>><[email protected]<mailto:[email protected]>> >>>>, >>>> Edward Zhang >>>><[email protected]<mailto:[email protected]>>, >>>> Srinath Perera <[email protected]<mailto:[email protected]>>, WSO2 >>>> Developers' List <[email protected]<mailto:[email protected]>> >>>> Subject: Re: [Dev] [Siddhi] what events is left in the window >>>> >>>> Thanks for pointing it out, >>>> >>>> We are looking into this. >>>> Will update you ASAP >>>> >>>> Suho >>>> >>>> On Thu, Dec 10, 2015 at 12:58 AM, Zhang, Edward (GDI Hadoop) < >>>> [email protected]<mailto:[email protected]>> wrote: >>>> By the way, we use siddhi version 3.0.2. >>>> >>>> Also for tracking this issue, I created jira >>>> https://wso2.org/jira/browse/CEP-1433 snapshot/restore does not work >>>>for >>>> aggregation on time based window >>>> >>>> Thanks >>>> Edward >>>> >>>> On 12/8/15, 17:57, "Zhang, Edward (GDI Hadoop)" >>>><[email protected]<mailto: >>>> [email protected]>> wrote: >>>> >>>>> Thanks for this suggestion, Suho. >>>>> >>>>> I did some testing on state persist and restore, looks most of use >>>>>cases >>>>> are working except group by. I am not if Siddhi team has known this. >>>>> >>>>> Please look at my test cases : testTimeSlideWindowWithGroupby >>>>> >>>> >>>>https://github.com/yonzhang/incubator-eagle/commit/606b65705ea20ce1592a >>>>20d >>>>> f9a1f85758168efcb >>>>> >>>>> The query is like the following >>>>> String cseEventStream = "define stream testStream (timeStamp long, >>>>>user >>>>> string, cmd string);"; >>>>> + String query = "@info(name = 'query1') from >>>>> testStream[cmd == 'open']#window.externalTime(timeStamp,3 sec)" >>>>> + + " select user, timeStamp, count() as cnt" >>>>> + + " group by user" >>>>> + + " having cnt > 2" >>>>> + + " insert all events into outputStream;"; >>>>> >>>>> The basic issue could be the following: >>>>> 1) when taking snapshot, it will persist all Count executor per key. >>>>>But >>>>> looks Siddhi adds same Count executor into snapshot list multiple >>>>> times(The count aggregator elementId is $planName+keyname) >>>>> 2) when restoring snapshot, it will not restore the Count executor >>>>>for >>>>> key because snopshotableList does not have the above key. That key >>>>>only >>>>> is generated when event comes in. When do restoration, we don¹t know >>>>> previous events. >>>>> >>>>> for (Snapshotable snapshotable : snapshotableList) { >>>>> >>>>>snapshotable.restoreState(snapshots.get(snapshotable.getElementId())); >>>>> } >>>>> >>>>> Please let me know if there is some issue with my test program or >>>>> something is wrong with Siddhi group by/aggregator snapshot >>>>> >>>>> Thanks >>>>> Edward >>>>> >>>>> From: Sriskandarajah Suhothayan <[email protected]<mailto:[email protected] >>>>> <mailto:[email protected]<mailto:[email protected]>>> >>>>> Date: Wednesday, November 25, 2015 at 21:07 >>>>> To: Edward Zhang >>>>><[email protected]<mailto:[email protected] >>>>> <mailto:[email protected]<mailto:[email protected]>>> >>>>> Cc: Srinath Perera <[email protected]<mailto:[email protected]><mailto: >>>> [email protected]<mailto:[email protected]>>>, WSO2 >>>>> Developers' List >>>>><[email protected]<mailto:[email protected]><mailto:[email protected] >>>> <mailto:[email protected]>>> >>>>> Subject: Re: [Dev] [Siddhi] what events is left in the window >>>>> >>>>> Hi >>>>> >>>>> Currently the concept of current event & expired events live within >>>>>the >>>>> query and all events going out to a stream are converted back to >>>>>current >>>>> events. So its hard for the application to keep track of the window >>>>>and >>>>> aggregation states like count, avg, std, etc... >>>>> Further window implementations can very based on its implementations >>>>> hence in some cases knowing what events entered and existed will not >>>>>be >>>>> enough to recreate the window during failure. >>>>> >>>>> The recommended way to keep track of state in Siddhi is via >>>>>snapshots, >>>>> you can take snapshots of the siddhi Runtime with a reasonable time >>>>> frame. and also buffer a copy of the events sent to siddhi after that >>>>> snapshot, with this method when there is a failure we should restore >>>>>the >>>>> latest snapshot and replay the events which are sent after the last >>>>> snapshot. The tricky part is the way you buffer events after >>>>>snapshot, >>>>> using Kafka and replaying is one option. >>>>> >>>>> Regards >>>>> Suho >>>>> >>>>> On Thu, Nov 26, 2015 at 10:01 AM, Edward Zhang >>>>> <[email protected]<mailto:[email protected]><mailto: >>>> [email protected]<mailto:[email protected]>>> wrote: >>>>> I tried expired events before, it only works for the query without >>>>> groupby. If the query contains groupby and having clause, then it >>>>>only >>>>> emit just expired event when having conditions is satisfied. >>>>> >>>>> For example >>>>> >>>>> String cseEventStream = "define stream TempStream (user string, cmd >>>>> string);"; >>>>> String query = "@info(name = 'query1') from >>>>>TempStream#window.length(4) " >>>>> + " select user, cmd, count(user) as cnt " + >>>>> " group by user " + >>>>> "having cnt > 3 " >>>>> + " insert all events into DelayedTempStream"; >>>>> >>>>> If we send events as follows, it will not generate expired events at >>>>>all. >>>>> >>>>> inputHandler.send(new Object[]{"user", "open1"}); >>>>> inputHandler.send(new Object[]{"user", "open2"}); >>>>> inputHandler.send(new Object[]{"user", "open3"}); >>>>> inputHandler.send(new Object[]{"user", "open4"}); >>>>> inputHandler.send(new Object[]{"user", "open5"}); >>>>> >>>>> >>>>> Thanks >>>>> Edward Zhang >>>>> >>>>> On Wed, Nov 25, 2015 at 6:50 PM, Srinath Perera >>>>> <[email protected]<mailto:[email protected]><mailto:[email protected] >>>> <mailto:[email protected]>>> wrote: >>>>> Adding Suho >>>>> >>>>> Hi Edward, >>>>> >>>>> Each window give you a stream of expired events as well. Would that >>>>>work? >>>>> >>>>> >>>> >>>>https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3. >>>>0-W >>>>> indow >>>>> >>>>> Thank >>>>> Srinath >>>>> >>>>> On Thu, Nov 19, 2015 at 5:37 AM, Edward Zhang >>>>> <[email protected]<mailto:[email protected]><mailto: >>>> [email protected]<mailto:[email protected]>>> wrote: >>>>> Hi Siddhi team, >>>>> >>>>> Do we have anyway of tracking what events are removed from any type >>>>>of >>>>> windows, length(batch), or time(batch)? I investigated that >>>>>removeEvents >>>>> may not be the right solution. >>>>> >>>>> We have one requirement of migrating policy from one machine to >>>>>another >>>>> machine but keeping internal state there. >>>>> >>>>> Eagle uses policy in storm infrastructure, but one machine which >>>>>holds >>>>> the policy fails, then already-populated events in the window also >>>>>are >>>>> gone. Sometimes it is bad especially when we have built up a long >>>>>window >>>>> like monthly data. >>>>> >>>>> One possible way is to keep all events in the siddhi window to be >>>>> snapshotted into application domain. Another way is to keep tracking >>>>>what >>>>> events are coming in and out, so application can track what are left >>>>>in >>>>> siddhi window. >>>>> >>>>> Here is the ticket for Eagle >>>>> https://issues.apache.org/jira/browse/EAGLE-39 >>>>> >>>>> Do you have similar request before? Or what do you suggest? >>>>> >>>>> Thanks >>>>> Edward Zhang >>>>> >>>>> _______________________________________________ >>>>> Dev mailing list >>>>> >>>>>[email protected]<mailto:[email protected]><mailto:[email protected]<mailto:Dev@wso2. >>>>>org >>>>>> >>>>> http://wso2.org/cgi-bin/mailman/listinfo/dev >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> ============================ >>>>> Srinath Perera, Ph.D. >>>>> http://people.apache.org/~hemapani/ >>>>> http://srinathsview.blogspot.com/ >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> S. Suhothayan >>>>> Technical Lead & Team Lead of WSO2 Complex Event Processor >>>>> WSO2 Inc. http://wso2.com<http://wso2.com/> >>>>> <http://wso2.com/> >>>>> lean . enterprise . middleware >>>>> >>>>> cell: (+94) 779 756 757<tel:%28%2B94%29%20779%20756%20757> | blog: >>>> http://suhothayan.blogspot.com/ >>>>> twitter: http://twitter.com/suhothayan | linked-in: >>>>> http://lk.linkedin.com/in/suhothayan >>>> >>>> >>>> >>>> >>>> -- >>>> S. Suhothayan >>>> Technical Lead & Team Lead of WSO2 Complex Event Processor >>>> WSO2 Inc. http://wso2.com<http://wso2.com/> >>>> <http://wso2.com/> >>>> lean . enterprise . middleware >>>> >>>> cell: (+94) 779 756 757 | blog: http://suhothayan.blogspot.com/ >>>> twitter: http://twitter.com/suhothayan | linked-in: >>>> http://lk.linkedin.com/in/suhothayan >>>> >> >
