State management of streams (including what I’d call “derived streams”) is a hard distributed systems problem. Ideally it would be solved by the stream provider, not by the Eagle project. I think you should talk to the various streaming projects — Storm, Samza, Kafka, Flink — and find out whether they can solve this, or whether it is on their roadmap.
I can make introductions to the leaders of those projects if needed. If the problem is solved at source, Eagle can focus on the actual problem rather than infrastructure. Julian > On Dec 10, 2015, at 7:48 PM, Liangfei.Su <[email protected]> wrote: > > Great proposal, this is important and could be general served for policy > capability and analytic feature. > > Periodically taken the snapshot independently on each bolt could make > status recoverable from recent history, but from whole topology store point > of view, this could not hand bolt status dependency exactly. > > Another point is should the state restore be triggered not only when > topology restarts but also when > a. topology re-balance > b. single bolt movement by underling stream framework for one executor to > another? > > Thanks, > Ralph > > > On Fri, Dec 11, 2015 at 9:49 AM, Zhang, Edward (GDI Hadoop) < > [email protected]> wrote: > >> This topic has been discussed offline for a while and it is time we >> document problems and solutions. With clear problem statements and proposed >> solutions, I believe we can do better before we start implementing. >> >> [Problem Statement] For Eagle as real-time big data monitoring framework >> evaluating policies efficiently is the core functionality. Most of >> meaningful polices are stateful in that policy evaluation is not based on a >> single event but on both previous events and current event. This >> potentially brings 2 fundamental problems, one is policy state loss upon >> machine failures or topology restart, the other is lacking history data >> upon fresh start. One simple example is for a policy like “from >> userActivity[cmd==‘delete’]time.window(1 month) select user, count() as cnt >> group by user having cnt > 1000”, if the task is restarted, the state of >> accumulated user/count map is missing. Also when the topology is started at >> the first time, the window is empty even if we have historic data in >> database. >> >> [Proposed Solutions] The natural way of solving the above 2 problems is >> 1) do policy state persist periodically and restore policy state after >> task is restarted >> Underlying policy engine should support snapshot and restore operations. >> In Siddhi 3.x, it already supports snapshot and restore, though I found >> some bugs with their state management. >> https://wso2.org/jira/browse/CEP-1433 >> For restore, it is not that straight-forward unless all input events to >> policy evaluator are backed by a reliable and rewindable storage like Kafka. >> If input events to policy evaluator is backed by Kafka, then each time >> when EAGLE takes snapshot, we records the current offset together and >> persist both of them to deep storage. >> If input events to policy evaluator is not backed by Kafka, then we need >> record every event since last snapshot. That looks very expensive. Apache >> Flink uses efficient algorithm called stream barrier to do group >> acknowledgement, but in Storm we don’t have this feature. But remember >> Apache Flink requires that each task do snapshot not only for policy >> evaluator. >> >> 2) transparently load historical data when topology is started at the >> first time >> If policy evaluator accepts data which is already persisted in database or >> Kafka, we can provide API to retrieve data from database or Kafka. This >> loading is transparent to developer, but developer/user needs to specify >> the deep storage for storing historical data. >> >> Also be aware that if we have the right solution for policy evaluator, the >> solution should be also applied to event aggregation. >> https://cwiki.apache.org/confluence/display/EAG/Stream+Analyze >> >> Another aggressive way is to use Flink stream barrier similar solution >> http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/ >> to take snapshot to all eagle tasks(typically spout and bolt) but turn off >> storm ACK mechanism. >> >> trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef], >> R] { >> def prepareConfig(config : Config) >> def init >> def fields : Array[String] >> } >> >> >> ==> >> >> >> trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef], >> R] { >> def prepareConfig(config : Config) >> def init >> def fields : Array[String] >> >> def snapshot : Array[Byte] >> >> def restore(state: Array[Byte]) >> } >> >> This is pretty much important to Eagle if we want Eagle to be a monitoring >> framework with fault-tolerance. >> >> Thanks >> Edward >> From: Sriskandarajah Suhothayan <[email protected]<mailto:[email protected]>> >> Date: Thursday, December 10, 2015 at 9:30 >> To: "Zhang, Edward (GDI Hadoop)" <[email protected]<mailto: >> [email protected]>> >> Cc: "[email protected]<mailto:[email protected]>" >> <[email protected]<mailto:[email protected]>>, >> Edward Zhang <[email protected]<mailto:[email protected]>>, >> Srinath Perera <[email protected]<mailto:[email protected]>>, WSO2 >> Developers' List <[email protected]<mailto:[email protected]>> >> Subject: Re: [Dev] [Siddhi] what events is left in the window >> >> Thanks for pointing it out, >> >> We are looking into this. >> Will update you ASAP >> >> Suho >> >> On Thu, Dec 10, 2015 at 12:58 AM, Zhang, Edward (GDI Hadoop) < >> [email protected]<mailto:[email protected]>> wrote: >> By the way, we use siddhi version 3.0.2. >> >> Also for tracking this issue, I created jira >> https://wso2.org/jira/browse/CEP-1433 snapshot/restore does not work for >> aggregation on time based window >> >> Thanks >> Edward >> >> On 12/8/15, 17:57, "Zhang, Edward (GDI Hadoop)" <[email protected]<mailto: >> [email protected]>> wrote: >> >>> Thanks for this suggestion, Suho. >>> >>> I did some testing on state persist and restore, looks most of use cases >>> are working except group by. I am not if Siddhi team has known this. >>> >>> Please look at my test cases : testTimeSlideWindowWithGroupby >>> >> https://github.com/yonzhang/incubator-eagle/commit/606b65705ea20ce1592a20d >>> f9a1f85758168efcb >>> >>> The query is like the following >>> String cseEventStream = "define stream testStream (timeStamp long, user >>> string, cmd string);"; >>> + String query = "@info(name = 'query1') from >>> testStream[cmd == 'open']#window.externalTime(timeStamp,3 sec)" >>> + + " select user, timeStamp, count() as cnt" >>> + + " group by user" >>> + + " having cnt > 2" >>> + + " insert all events into outputStream;"; >>> >>> The basic issue could be the following: >>> 1) when taking snapshot, it will persist all Count executor per key. But >>> looks Siddhi adds same Count executor into snapshot list multiple >>> times(The count aggregator elementId is $planName+keyname) >>> 2) when restoring snapshot, it will not restore the Count executor for >>> key because snopshotableList does not have the above key. That key only >>> is generated when event comes in. When do restoration, we don¹t know >>> previous events. >>> >>> for (Snapshotable snapshotable : snapshotableList) { >>> snapshotable.restoreState(snapshots.get(snapshotable.getElementId())); >>> } >>> >>> Please let me know if there is some issue with my test program or >>> something is wrong with Siddhi group by/aggregator snapshot >>> >>> Thanks >>> Edward >>> >>> From: Sriskandarajah Suhothayan <[email protected]<mailto:[email protected] >>> <mailto:[email protected]<mailto:[email protected]>>> >>> Date: Wednesday, November 25, 2015 at 21:07 >>> To: Edward Zhang <[email protected]<mailto:[email protected] >>> <mailto:[email protected]<mailto:[email protected]>>> >>> Cc: Srinath Perera <[email protected]<mailto:[email protected]><mailto: >> [email protected]<mailto:[email protected]>>>, WSO2 >>> Developers' List <[email protected]<mailto:[email protected]><mailto:[email protected] >> <mailto:[email protected]>>> >>> Subject: Re: [Dev] [Siddhi] what events is left in the window >>> >>> Hi >>> >>> Currently the concept of current event & expired events live within the >>> query and all events going out to a stream are converted back to current >>> events. So its hard for the application to keep track of the window and >>> aggregation states like count, avg, std, etc... >>> Further window implementations can very based on its implementations >>> hence in some cases knowing what events entered and existed will not be >>> enough to recreate the window during failure. >>> >>> The recommended way to keep track of state in Siddhi is via snapshots, >>> you can take snapshots of the siddhi Runtime with a reasonable time >>> frame. and also buffer a copy of the events sent to siddhi after that >>> snapshot, with this method when there is a failure we should restore the >>> latest snapshot and replay the events which are sent after the last >>> snapshot. The tricky part is the way you buffer events after snapshot, >>> using Kafka and replaying is one option. >>> >>> Regards >>> Suho >>> >>> On Thu, Nov 26, 2015 at 10:01 AM, Edward Zhang >>> <[email protected]<mailto:[email protected]><mailto: >> [email protected]<mailto:[email protected]>>> wrote: >>> I tried expired events before, it only works for the query without >>> groupby. If the query contains groupby and having clause, then it only >>> emit just expired event when having conditions is satisfied. >>> >>> For example >>> >>> String cseEventStream = "define stream TempStream (user string, cmd >>> string);"; >>> String query = "@info(name = 'query1') from TempStream#window.length(4) " >>> + " select user, cmd, count(user) as cnt " + >>> " group by user " + >>> "having cnt > 3 " >>> + " insert all events into DelayedTempStream"; >>> >>> If we send events as follows, it will not generate expired events at all. >>> >>> inputHandler.send(new Object[]{"user", "open1"}); >>> inputHandler.send(new Object[]{"user", "open2"}); >>> inputHandler.send(new Object[]{"user", "open3"}); >>> inputHandler.send(new Object[]{"user", "open4"}); >>> inputHandler.send(new Object[]{"user", "open5"}); >>> >>> >>> Thanks >>> Edward Zhang >>> >>> On Wed, Nov 25, 2015 at 6:50 PM, Srinath Perera >>> <[email protected]<mailto:[email protected]><mailto:[email protected] >> <mailto:[email protected]>>> wrote: >>> Adding Suho >>> >>> Hi Edward, >>> >>> Each window give you a stream of expired events as well. Would that work? >>> >>> >> https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.0-W >>> indow >>> >>> Thank >>> Srinath >>> >>> On Thu, Nov 19, 2015 at 5:37 AM, Edward Zhang >>> <[email protected]<mailto:[email protected]><mailto: >> [email protected]<mailto:[email protected]>>> wrote: >>> Hi Siddhi team, >>> >>> Do we have anyway of tracking what events are removed from any type of >>> windows, length(batch), or time(batch)? I investigated that removeEvents >>> may not be the right solution. >>> >>> We have one requirement of migrating policy from one machine to another >>> machine but keeping internal state there. >>> >>> Eagle uses policy in storm infrastructure, but one machine which holds >>> the policy fails, then already-populated events in the window also are >>> gone. Sometimes it is bad especially when we have built up a long window >>> like monthly data. >>> >>> One possible way is to keep all events in the siddhi window to be >>> snapshotted into application domain. Another way is to keep tracking what >>> events are coming in and out, so application can track what are left in >>> siddhi window. >>> >>> Here is the ticket for Eagle >>> https://issues.apache.org/jira/browse/EAGLE-39 >>> >>> Do you have similar request before? Or what do you suggest? >>> >>> Thanks >>> Edward Zhang >>> >>> _______________________________________________ >>> Dev mailing list >>> [email protected]<mailto:[email protected]><mailto:[email protected]<mailto:[email protected] >>>> >>> http://wso2.org/cgi-bin/mailman/listinfo/dev >>> >>> >>> >>> >>> -- >>> ============================ >>> Srinath Perera, Ph.D. >>> http://people.apache.org/~hemapani/ >>> http://srinathsview.blogspot.com/ >>> >>> >>> >>> >>> -- >>> S. Suhothayan >>> Technical Lead & Team Lead of WSO2 Complex Event Processor >>> WSO2 Inc. http://wso2.com<http://wso2.com/> >>> <http://wso2.com/> >>> lean . enterprise . middleware >>> >>> cell: (+94) 779 756 757<tel:%28%2B94%29%20779%20756%20757> | blog: >> http://suhothayan.blogspot.com/ >>> twitter: http://twitter.com/suhothayan | linked-in: >>> http://lk.linkedin.com/in/suhothayan >> >> >> >> >> -- >> S. Suhothayan >> Technical Lead & Team Lead of WSO2 Complex Event Processor >> WSO2 Inc. http://wso2.com<http://wso2.com/> >> <http://wso2.com/> >> lean . enterprise . middleware >> >> cell: (+94) 779 756 757 | blog: http://suhothayan.blogspot.com/ >> twitter: http://twitter.com/suhothayan | linked-in: >> http://lk.linkedin.com/in/suhothayan >>
