Hi Inosh, Thanks for the fix.. We have incorporated this to analytics-is..
Regards, Mohan On Tue, Jun 14, 2016 at 7:33 PM, Inosh Goonewardena <[email protected]> wrote: > Hi All, > > We made a modification to incremental data processing syntax and > implementation in order to avoid inconsistencies occur with timeWindows. > > For example, with the previous approach if the timeWindow is specified as > 3600 (or 86400) incremental read time is getting set to the hour starting > time (or day starting) time of UTC time. It is due to the the fact that > incremental start time is calculated by performing modulo operation on a > timestamp. In most of the scenarios, analytics is implemented in per <time > unit> basis, i.e., we maintain summary tables for per minute, per hour, per > day, per month. In these summary tables, we do windowing based on the local > timezone of the server which DAS runs [1]. Since incremental time window > start at UTC time (hour or day) inconsistencies happens in aggregate values > of time windows. > > With the new introduced changes, incremental window start time is set > based on local time zone to avoid aforementioned inconsistencies. Syntax is > as follows. > > CREATE TEMPORARY TABLE T1 USING CarbonAnalytics options (tableName "test", > incrementalProcessing "T1, *WindowUnit*, WindowOffset")*;* > > > WindowUnit concept is similar to the previous concept of TimeWindow, but > in here corresponding window time unit needs to be provided instead of time > in millis. The units supported are SECOND, MINUTE, HOUR, DAY, MONTH, YEAR. > > For example, lets say server runs in IST time and last processed event > time is 1461111538669 (2016/04/20 05:48:58 PM IST). If the WindowUnit is > set as DAY and WindowOffset is set to 0 in the incremental table, next > script execution will read data starting from 1461090600000 (2016/04/20 > 12:00:00 AM IST). > > [1] [Analytics] Using UTC across all temporal analytics functions > > > > On Fri, Mar 25, 2016 at 10:59 AM, Gihan Anuruddha <[email protected]> wrote: > >> Actually currently in ESB-analytics also we are using a similar >> mechanism. In there we keep 5 tables for each time unites(second, minute, >> hour, day, month) and we have windowOffset in correspondent to it time >> unite like in minute-60, hour- 3600 etc. We are only storing min, max, sum >> and count values only. So at a given time if we need the average, we divide >> the sum with count and get that. >> >> On Fri, Mar 25, 2016 at 8:47 AM, Inosh Goonewardena <[email protected]> >> wrote: >> >>> >>> >>> On Thu, Mar 24, 2016 at 9:56 PM, Sachith Withana <[email protected]> >>> wrote: >>> >>>> Hi Inosh, >>>> >>>> We wouldn't have to do that IMO. >>>> >>>> We can persist the total aggregate value upto currentTimeWindow - >>>> WindowOffset, along with the previous time window aggregation meta data as >>>> well ( count, sum in the average aggregation case). >>>> >>> >>> Yep. That will do. >>> >>>> >>>> The previous total wouldn't be calculated again, it's the last two time >>>> windows ( including the current one) that we need to recalculate and add it >>>> to the previous total. >>>> >>>> It works almost the same way as the current incremental processing >>>> table, but keeping more meta_data on the aggregation related values. >>>> >>>> @Anjana WDYT? >>>> >>>> Thanks, >>>> Sachith >>>> >>>> >>>> On Fri, Mar 25, 2016 at 7:07 AM, Inosh Goonewardena <[email protected]> >>>> wrote: >>>> >>>>> Hi Sachith, >>>>> >>>>> >>>>> *WindowOffset*: >>>>>> >>>>>> Events might arrive late that would belong to a previous processed >>>>>> time window. To account to that, we have added an optional parameter that >>>>>> would allow to >>>>>> process immediately previous time windows as well ( acts like a >>>>>> buffer). >>>>>> ex: If this is set to 1, apart from the to-be-processed data, data >>>>>> related to the previously processed time window will also be taken for >>>>>> processing. >>>>>> >>>>> >>>>> >>>>> This means, if window offset is set, already processed data will be >>>>> processed again. How does the aggregate functions works in this case? >>>>> Actually, what is the plan on supporting aggregate functions? Let's take >>>>> average function as an example. Are we going to persist sum and count >>>>> values per time windows and re-calculate whole average based on values of >>>>> all time windows? Is so, I would guess we can update previously processed >>>>> time windows sum and count values. >>>>> >>>>> >>>>> On Thu, Mar 24, 2016 at 3:50 AM, Sachith Withana <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Srinath, >>>>>> >>>>>> Please find the comments inline. >>>>>> >>>>>> On Thu, Mar 24, 2016 at 11:39 AM, Srinath Perera <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi Sachith, Anjana, >>>>>>> >>>>>>> +1 for the backend model. >>>>>>> >>>>>>> Are we handling the case when last run was done, 25 minutes of data >>>>>>> is processed. Basically, next run has to re-run last hour and update the >>>>>>> value. >>>>>>> >>>>>> >>>>>> Yes. It will recalculate for that hour and will update the value. >>>>>> >>>>>> >>>>>>> >>>>>>> When does one hour counting starts? is it from the moment server >>>>>>> starts? That will be probabilistic when you restart. I think we need to >>>>>>> either start with know place ( midnight) or let user configure it. >>>>>>> >>>>>> >>>>>> In the first run all the data available are processed. >>>>>> After that it calculates the floor of last processed events' >>>>>> timestamp and gets the floor value (timestamp - timestamp%3600), that >>>>>> would >>>>>> be used as the start of the time windows. >>>>>> >>>>>>> >>>>>>> I am bit concern about the syntax though. This only works for very >>>>>>> specific type of queries ( that includes aggregate and a group by). What >>>>>>> happen if user do this with a different query? Can we give clear error >>>>>>> message? >>>>>>> >>>>>> >>>>>> Currently the error messages are very generic. We will have to work >>>>>> on it to improve those messages. >>>>>> >>>>>> Thanks, >>>>>> Sachith >>>>>> >>>>>> >>>>>>> >>>>>>> --Srinath >>>>>>> >>>>>>> On Mon, Mar 21, 2016 at 5:15 PM, Sachith Withana <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi all, >>>>>>>> >>>>>>>> We are adding incremental processing capability to Spark in DAS. >>>>>>>> As the first stage, we added time slicing to Spark execution. >>>>>>>> >>>>>>>> Here's a quick introduction into that. >>>>>>>> >>>>>>>> *Execution*: >>>>>>>> >>>>>>>> In the first run of the script, it will process all the data in the >>>>>>>> given table and store the last processed event timestamp. >>>>>>>> Then from the next run onwards it would start processing starting >>>>>>>> from that stored timestamp. >>>>>>>> >>>>>>>> Until the query that contains the data processing part, completes, >>>>>>>> last processed event timestamp would not be overridden with the new >>>>>>>> value. >>>>>>>> This is to ensure that the data processing for the query wouldn't >>>>>>>> have to be done again, if the whole query fails. >>>>>>>> This is ensured by adding a commit query after the main query. >>>>>>>> Refer to the Syntax section for the example. >>>>>>>> >>>>>>>> *Syntax*: >>>>>>>> >>>>>>>> In the spark script, incremental processing support has to be >>>>>>>> specified per table, this would happen in the create temporary table >>>>>>>> line. >>>>>>>> >>>>>>>> ex: CREATE TEMPORARY TABLE T1 USING CarbonAnalytics options >>>>>>>> (tableName "test", >>>>>>>> *incrementalProcessing "T1,3600");* >>>>>>>> >>>>>>>> INSERT INTO T2 SELECT username, age GROUP BY age FROM T1; >>>>>>>> >>>>>>>> INC_TABLE_COMMIT T1; >>>>>>>> >>>>>>>> The last line is where it ensures the processing took place >>>>>>>> successfully and replaces the lastProcessed timestamp with the new one. >>>>>>>> >>>>>>>> *TimeWindow*: >>>>>>>> >>>>>>>> To do the incremental processing, the user has to provide the time >>>>>>>> window per which the data would be processed. >>>>>>>> In the above example. the data would be summarized in *1 hour *time >>>>>>>> windows. >>>>>>>> >>>>>>>> *WindowOffset*: >>>>>>>> >>>>>>>> Events might arrive late that would belong to a previous processed >>>>>>>> time window. To account to that, we have added an optional parameter >>>>>>>> that >>>>>>>> would allow to >>>>>>>> process immediately previous time windows as well ( acts like a >>>>>>>> buffer). >>>>>>>> ex: If this is set to 1, apart from the to-be-processed data, data >>>>>>>> related to the previously processed time window will also be taken for >>>>>>>> processing. >>>>>>>> >>>>>>>> >>>>>>>> *Limitations*: >>>>>>>> >>>>>>>> Currently, multiple time windows cannot be specified per temporary >>>>>>>> table in the same script. >>>>>>>> It would have to be done using different temporary tables. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> *Future Improvements:* >>>>>>>> - Add aggregation function support for incremental processing >>>>>>>> >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Sachith >>>>>>>> -- >>>>>>>> Sachith Withana >>>>>>>> Software Engineer; WSO2 Inc.; http://wso2.com >>>>>>>> E-mail: sachith AT wso2.com >>>>>>>> M: +94715518127 >>>>>>>> Linked-In: <http://goog_416592669> >>>>>>>> https://lk.linkedin.com/in/sachithwithana >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> ============================ >>>>>>> Srinath Perera, Ph.D. >>>>>>> http://people.apache.org/~hemapani/ >>>>>>> http://srinathsview.blogspot.com/ >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Sachith Withana >>>>>> Software Engineer; WSO2 Inc.; http://wso2.com >>>>>> E-mail: sachith AT wso2.com >>>>>> M: +94715518127 >>>>>> Linked-In: <http://goog_416592669> >>>>>> https://lk.linkedin.com/in/sachithwithana >>>>>> >>>>>> _______________________________________________ >>>>>> Architecture mailing list >>>>>> [email protected] >>>>>> https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Thanks & Regards, >>>>> >>>>> Inosh Goonewardena >>>>> Associate Technical Lead- WSO2 Inc. >>>>> Mobile: +94779966317 >>>>> >>>> >>>> >>>> >>>> -- >>>> Sachith Withana >>>> Software Engineer; WSO2 Inc.; http://wso2.com >>>> E-mail: sachith AT wso2.com >>>> M: +94715518127 >>>> Linked-In: <http://goog_416592669> >>>> https://lk.linkedin.com/in/sachithwithana >>>> >>> >>> >>> >>> -- >>> Thanks & Regards, >>> >>> Inosh Goonewardena >>> Associate Technical Lead- WSO2 Inc. >>> Mobile: +94779966317 >>> >>> _______________________________________________ >>> Architecture mailing list >>> [email protected] >>> https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture >>> >>> >> >> >> -- >> W.G. Gihan Anuruddha >> Senior Software Engineer | WSO2, Inc. >> M: +94772272595 >> >> _______________________________________________ >> Architecture mailing list >> [email protected] >> https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture >> >> > > > -- > Thanks & Regards, > > Inosh Goonewardena > Associate Technical Lead- WSO2 Inc. > Mobile: +94779966317 > -- *V. Mohanadarshan* *Associate Tech Lead,* *Data Technologies Team,* *WSO2, Inc. http://wso2.com <http://wso2.com> * *lean.enterprise.middleware.* email: [email protected] phone:(+94) 771117673
_______________________________________________ Architecture mailing list [email protected] https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture
