In Storm 1.0 (which we hope to release in the next month or so) adds distributed cache/blobstore functionality that could be leveraged to solve a lot of the problems described in this thread. Another relevant feature is native windowing with persistent state (currently under development).
Documentation of these features is a little light, but I’ll try to forward it on to this list when it’s more fully baked. -Taylor > On Dec 11, 2015, at 2:27 PM, Julian Hyde <[email protected]> wrote: > > State management of streams (including what I’d call “derived streams”) is a > hard distributed systems problem. Ideally it would be solved by the stream > provider, not by the Eagle project. I think you should talk to the various > streaming projects — Storm, Samza, Kafka, Flink — and find out whether they > can solve this, or whether it is on their roadmap. > > I can make introductions to the leaders of those projects if needed. > > If the problem is solved at source, Eagle can focus on the actual problem > rather than infrastructure. > > Julian > > >> On Dec 10, 2015, at 7:48 PM, Liangfei.Su <[email protected]> wrote: >> >> Great proposal, this is important and could be general served for policy >> capability and analytic feature. >> >> Periodically taken the snapshot independently on each bolt could make >> status recoverable from recent history, but from whole topology store point >> of view, this could not hand bolt status dependency exactly. >> >> Another point is should the state restore be triggered not only when >> topology restarts but also when >> a. topology re-balance >> b. single bolt movement by underling stream framework for one executor to >> another? >> >> Thanks, >> Ralph >> >> >> On Fri, Dec 11, 2015 at 9:49 AM, Zhang, Edward (GDI Hadoop) < >> [email protected]> wrote: >> >>> This topic has been discussed offline for a while and it is time we >>> document problems and solutions. With clear problem statements and proposed >>> solutions, I believe we can do better before we start implementing. >>> >>> [Problem Statement] For Eagle as real-time big data monitoring framework >>> evaluating policies efficiently is the core functionality. Most of >>> meaningful polices are stateful in that policy evaluation is not based on a >>> single event but on both previous events and current event. This >>> potentially brings 2 fundamental problems, one is policy state loss upon >>> machine failures or topology restart, the other is lacking history data >>> upon fresh start. One simple example is for a policy like “from >>> userActivity[cmd==‘delete’]time.window(1 month) select user, count() as cnt >>> group by user having cnt > 1000”, if the task is restarted, the state of >>> accumulated user/count map is missing. Also when the topology is started at >>> the first time, the window is empty even if we have historic data in >>> database. >>> >>> [Proposed Solutions] The natural way of solving the above 2 problems is >>> 1) do policy state persist periodically and restore policy state after >>> task is restarted >>> Underlying policy engine should support snapshot and restore operations. >>> In Siddhi 3.x, it already supports snapshot and restore, though I found >>> some bugs with their state management. >>> https://wso2.org/jira/browse/CEP-1433 >>> For restore, it is not that straight-forward unless all input events to >>> policy evaluator are backed by a reliable and rewindable storage like Kafka. >>> If input events to policy evaluator is backed by Kafka, then each time >>> when EAGLE takes snapshot, we records the current offset together and >>> persist both of them to deep storage. >>> If input events to policy evaluator is not backed by Kafka, then we need >>> record every event since last snapshot. That looks very expensive. Apache >>> Flink uses efficient algorithm called stream barrier to do group >>> acknowledgement, but in Storm we don’t have this feature. But remember >>> Apache Flink requires that each task do snapshot not only for policy >>> evaluator. >>> >>> 2) transparently load historical data when topology is started at the >>> first time >>> If policy evaluator accepts data which is already persisted in database or >>> Kafka, we can provide API to retrieve data from database or Kafka. This >>> loading is transparent to developer, but developer/user needs to specify >>> the deep storage for storing historical data. >>> >>> Also be aware that if we have the right solution for policy evaluator, the >>> solution should be also applied to event aggregation. >>> https://cwiki.apache.org/confluence/display/EAG/Stream+Analyze >>> >>> Another aggressive way is to use Flink stream barrier similar solution >>> http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/ >>> to take snapshot to all eagle tasks(typically spout and bolt) but turn off >>> storm ACK mechanism. >>> >>> trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef], >>> R] { >>> def prepareConfig(config : Config) >>> def init >>> def fields : Array[String] >>> } >>> >>> >>> ==> >>> >>> >>> trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef], >>> R] { >>> def prepareConfig(config : Config) >>> def init >>> def fields : Array[String] >>> >>> def snapshot : Array[Byte] >>> >>> def restore(state: Array[Byte]) >>> } >>> >>> This is pretty much important to Eagle if we want Eagle to be a monitoring >>> framework with fault-tolerance. >>> >>> Thanks >>> Edward >>> From: Sriskandarajah Suhothayan <[email protected]<mailto:[email protected]>> >>> Date: Thursday, December 10, 2015 at 9:30 >>> To: "Zhang, Edward (GDI Hadoop)" <[email protected]<mailto: >>> [email protected]>> >>> Cc: "[email protected]<mailto:[email protected]>" >>> <[email protected]<mailto:[email protected]>>, >>> Edward Zhang <[email protected]<mailto:[email protected]>>, >>> Srinath Perera <[email protected]<mailto:[email protected]>>, WSO2 >>> Developers' List <[email protected]<mailto:[email protected]>> >>> Subject: Re: [Dev] [Siddhi] what events is left in the window >>> >>> Thanks for pointing it out, >>> >>> We are looking into this. >>> Will update you ASAP >>> >>> Suho >>> >>> On Thu, Dec 10, 2015 at 12:58 AM, Zhang, Edward (GDI Hadoop) < >>> [email protected]<mailto:[email protected]>> wrote: >>> By the way, we use siddhi version 3.0.2. >>> >>> Also for tracking this issue, I created jira >>> https://wso2.org/jira/browse/CEP-1433 snapshot/restore does not work for >>> aggregation on time based window >>> >>> Thanks >>> Edward >>> >>> On 12/8/15, 17:57, "Zhang, Edward (GDI Hadoop)" <[email protected]<mailto: >>> [email protected]>> wrote: >>> >>>> Thanks for this suggestion, Suho. >>>> >>>> I did some testing on state persist and restore, looks most of use cases >>>> are working except group by. I am not if Siddhi team has known this. >>>> >>>> Please look at my test cases : testTimeSlideWindowWithGroupby >>>> >>> https://github.com/yonzhang/incubator-eagle/commit/606b65705ea20ce1592a20d >>>> f9a1f85758168efcb >>>> >>>> The query is like the following >>>> String cseEventStream = "define stream testStream (timeStamp long, user >>>> string, cmd string);"; >>>> + String query = "@info(name = 'query1') from >>>> testStream[cmd == 'open']#window.externalTime(timeStamp,3 sec)" >>>> + + " select user, timeStamp, count() as cnt" >>>> + + " group by user" >>>> + + " having cnt > 2" >>>> + + " insert all events into outputStream;"; >>>> >>>> The basic issue could be the following: >>>> 1) when taking snapshot, it will persist all Count executor per key. But >>>> looks Siddhi adds same Count executor into snapshot list multiple >>>> times(The count aggregator elementId is $planName+keyname) >>>> 2) when restoring snapshot, it will not restore the Count executor for >>>> key because snopshotableList does not have the above key. That key only >>>> is generated when event comes in. When do restoration, we don¹t know >>>> previous events. >>>> >>>> for (Snapshotable snapshotable : snapshotableList) { >>>> snapshotable.restoreState(snapshots.get(snapshotable.getElementId())); >>>> } >>>> >>>> Please let me know if there is some issue with my test program or >>>> something is wrong with Siddhi group by/aggregator snapshot >>>> >>>> Thanks >>>> Edward >>>> >>>> From: Sriskandarajah Suhothayan <[email protected]<mailto:[email protected] >>>> <mailto:[email protected]<mailto:[email protected]>>> >>>> Date: Wednesday, November 25, 2015 at 21:07 >>>> To: Edward Zhang <[email protected]<mailto:[email protected] >>>> <mailto:[email protected]<mailto:[email protected]>>> >>>> Cc: Srinath Perera <[email protected]<mailto:[email protected]><mailto: >>> [email protected]<mailto:[email protected]>>>, WSO2 >>>> Developers' List <[email protected]<mailto:[email protected]><mailto:[email protected] >>> <mailto:[email protected]>>> >>>> Subject: Re: [Dev] [Siddhi] what events is left in the window >>>> >>>> Hi >>>> >>>> Currently the concept of current event & expired events live within the >>>> query and all events going out to a stream are converted back to current >>>> events. So its hard for the application to keep track of the window and >>>> aggregation states like count, avg, std, etc... >>>> Further window implementations can very based on its implementations >>>> hence in some cases knowing what events entered and existed will not be >>>> enough to recreate the window during failure. >>>> >>>> The recommended way to keep track of state in Siddhi is via snapshots, >>>> you can take snapshots of the siddhi Runtime with a reasonable time >>>> frame. and also buffer a copy of the events sent to siddhi after that >>>> snapshot, with this method when there is a failure we should restore the >>>> latest snapshot and replay the events which are sent after the last >>>> snapshot. The tricky part is the way you buffer events after snapshot, >>>> using Kafka and replaying is one option. >>>> >>>> Regards >>>> Suho >>>> >>>> On Thu, Nov 26, 2015 at 10:01 AM, Edward Zhang >>>> <[email protected]<mailto:[email protected]><mailto: >>> [email protected]<mailto:[email protected]>>> wrote: >>>> I tried expired events before, it only works for the query without >>>> groupby. If the query contains groupby and having clause, then it only >>>> emit just expired event when having conditions is satisfied. >>>> >>>> For example >>>> >>>> String cseEventStream = "define stream TempStream (user string, cmd >>>> string);"; >>>> String query = "@info(name = 'query1') from TempStream#window.length(4) " >>>> + " select user, cmd, count(user) as cnt " + >>>> " group by user " + >>>> "having cnt > 3 " >>>> + " insert all events into DelayedTempStream"; >>>> >>>> If we send events as follows, it will not generate expired events at all. >>>> >>>> inputHandler.send(new Object[]{"user", "open1"}); >>>> inputHandler.send(new Object[]{"user", "open2"}); >>>> inputHandler.send(new Object[]{"user", "open3"}); >>>> inputHandler.send(new Object[]{"user", "open4"}); >>>> inputHandler.send(new Object[]{"user", "open5"}); >>>> >>>> >>>> Thanks >>>> Edward Zhang >>>> >>>> On Wed, Nov 25, 2015 at 6:50 PM, Srinath Perera >>>> <[email protected]<mailto:[email protected]><mailto:[email protected] >>> <mailto:[email protected]>>> wrote: >>>> Adding Suho >>>> >>>> Hi Edward, >>>> >>>> Each window give you a stream of expired events as well. Would that work? >>>> >>>> >>> https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.0-W >>>> indow >>>> >>>> Thank >>>> Srinath >>>> >>>> On Thu, Nov 19, 2015 at 5:37 AM, Edward Zhang >>>> <[email protected]<mailto:[email protected]><mailto: >>> [email protected]<mailto:[email protected]>>> wrote: >>>> Hi Siddhi team, >>>> >>>> Do we have anyway of tracking what events are removed from any type of >>>> windows, length(batch), or time(batch)? I investigated that removeEvents >>>> may not be the right solution. >>>> >>>> We have one requirement of migrating policy from one machine to another >>>> machine but keeping internal state there. >>>> >>>> Eagle uses policy in storm infrastructure, but one machine which holds >>>> the policy fails, then already-populated events in the window also are >>>> gone. Sometimes it is bad especially when we have built up a long window >>>> like monthly data. >>>> >>>> One possible way is to keep all events in the siddhi window to be >>>> snapshotted into application domain. Another way is to keep tracking what >>>> events are coming in and out, so application can track what are left in >>>> siddhi window. >>>> >>>> Here is the ticket for Eagle >>>> https://issues.apache.org/jira/browse/EAGLE-39 >>>> >>>> Do you have similar request before? Or what do you suggest? >>>> >>>> Thanks >>>> Edward Zhang >>>> >>>> _______________________________________________ >>>> Dev mailing list >>>> [email protected]<mailto:[email protected]><mailto:[email protected]<mailto:[email protected] >>>>> >>>> http://wso2.org/cgi-bin/mailman/listinfo/dev >>>> >>>> >>>> >>>> >>>> -- >>>> ============================ >>>> Srinath Perera, Ph.D. >>>> http://people.apache.org/~hemapani/ >>>> http://srinathsview.blogspot.com/ >>>> >>>> >>>> >>>> >>>> -- >>>> S. Suhothayan >>>> Technical Lead & Team Lead of WSO2 Complex Event Processor >>>> WSO2 Inc. http://wso2.com<http://wso2.com/> >>>> <http://wso2.com/> >>>> lean . enterprise . middleware >>>> >>>> cell: (+94) 779 756 757<tel:%28%2B94%29%20779%20756%20757> | blog: >>> http://suhothayan.blogspot.com/ >>>> twitter: http://twitter.com/suhothayan | linked-in: >>>> http://lk.linkedin.com/in/suhothayan >>> >>> >>> >>> >>> -- >>> S. Suhothayan >>> Technical Lead & Team Lead of WSO2 Complex Event Processor >>> WSO2 Inc. http://wso2.com<http://wso2.com/> >>> <http://wso2.com/> >>> lean . enterprise . middleware >>> >>> cell: (+94) 779 756 757 | blog: http://suhothayan.blogspot.com/ >>> twitter: http://twitter.com/suhothayan | linked-in: >>> http://lk.linkedin.com/in/suhothayan >>> >
signature.asc
Description: Message signed with OpenPGP using GPGMail
