Hi Inosh,

Thanks for the fix.. We have incorporated this to analytics-is..

Regards,
Mohan


On Tue, Jun 14, 2016 at 7:33 PM, Inosh Goonewardena <[email protected]> wrote:

> Hi All,
>
> We made a modification to incremental data processing syntax and
> implementation in order to avoid inconsistencies occur with timeWindows.
>
> For example, with the previous approach if the timeWindow is specified as
> 3600 (or 86400) incremental read time is getting set to the hour starting
> time (or day starting) time of UTC time. It is due to the the fact that
> incremental start time is calculated by performing modulo operation on a
> timestamp. In most of the scenarios, analytics is implemented in per <time
> unit> basis, i.e., we maintain summary tables for per minute, per hour, per
> day, per month. In these summary tables, we do windowing based on the local
> timezone of the server which DAS runs [1]. Since incremental time window
> start at UTC time (hour or day) inconsistencies happens in aggregate values
> of time windows.
>
> With the new introduced changes, incremental window start time is set
> based on local time zone to avoid aforementioned inconsistencies. Syntax is
> as follows.
>
> CREATE TEMPORARY TABLE T1 USING CarbonAnalytics options (tableName "test",
> incrementalProcessing "T1, *WindowUnit*, WindowOffset")*;*
>
>
> WindowUnit concept is similar to the previous concept of TimeWindow, but
> in here corresponding window time unit needs to be provided instead of time
> in millis. The units supported are SECOND, MINUTE, HOUR, DAY, MONTH, YEAR.
>
> For example, lets say server runs in IST time and last processed event
> time is 1461111538669 (2016/04/20 05:48:58 PM IST). If the WindowUnit is
> set as DAY and WindowOffset is set to 0 in the incremental table, next
> script execution will read data starting from 1461090600000 (2016/04/20
> 12:00:00 AM IST).
>
> [1] [Analytics] Using UTC across all temporal analytics functions
>
>
>
> On Fri, Mar 25, 2016 at 10:59 AM, Gihan Anuruddha <[email protected]> wrote:
>
>> Actually currently in ESB-analytics also we are using a similar
>> mechanism. In there we keep 5 tables for each time unites(second, minute,
>> hour, day, month) and we have windowOffset in correspondent to it time
>> unite like in minute-60, hour- 3600 etc. We are only storing min, max, sum
>> and count values only. So at a given time if we need the average, we divide
>> the sum with count and get that.
>>
>> On Fri, Mar 25, 2016 at 8:47 AM, Inosh Goonewardena <[email protected]>
>> wrote:
>>
>>>
>>>
>>> On Thu, Mar 24, 2016 at 9:56 PM, Sachith Withana <[email protected]>
>>> wrote:
>>>
>>>> Hi Inosh,
>>>>
>>>> We wouldn't have to do that IMO.
>>>>
>>>> We can persist the total aggregate value upto currentTimeWindow -
>>>> WindowOffset, along with the previous time window aggregation meta data as
>>>> well ( count, sum in the average aggregation case).
>>>>
>>>
>>> Yep. That will do.
>>>
>>>>
>>>> The previous total wouldn't be calculated again, it's the last two time
>>>> windows ( including the current one) that we need to recalculate and add it
>>>> to the previous total.
>>>>
>>>> It works almost the same way as the current incremental processing
>>>> table, but keeping more meta_data on the aggregation related values.
>>>>
>>>> @Anjana WDYT?
>>>>
>>>> Thanks,
>>>> Sachith
>>>>
>>>>
>>>> On Fri, Mar 25, 2016 at 7:07 AM, Inosh Goonewardena <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Sachith,
>>>>>
>>>>>
>>>>> *WindowOffset*:
>>>>>>
>>>>>> Events might arrive late that would belong to a previous processed
>>>>>> time window. To account to that, we have added an optional parameter that
>>>>>> would allow to
>>>>>> process immediately previous time windows as well ( acts like a
>>>>>> buffer).
>>>>>> ex: If this is set to 1, apart from the to-be-processed data, data
>>>>>> related to the previously processed time window will also be taken for
>>>>>> processing.
>>>>>>
>>>>>
>>>>>
>>>>> This means, if window offset is set, already processed data will be
>>>>> processed again. How does the aggregate functions works in this case?
>>>>> Actually, what is the plan on supporting aggregate functions? Let's take
>>>>> average function as an example. Are we going to persist sum and count
>>>>> values per time windows and re-calculate whole average based on values of
>>>>> all time windows? Is so, I would guess we can update previously processed
>>>>> time windows sum and count values.
>>>>>
>>>>>
>>>>> On Thu, Mar 24, 2016 at 3:50 AM, Sachith Withana <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Srinath,
>>>>>>
>>>>>> Please find the comments inline.
>>>>>>
>>>>>> On Thu, Mar 24, 2016 at 11:39 AM, Srinath Perera <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Sachith, Anjana,
>>>>>>>
>>>>>>> +1 for the backend model.
>>>>>>>
>>>>>>> Are we handling the case when last run was done, 25 minutes of data
>>>>>>> is processed. Basically, next run has to re-run last hour and update the
>>>>>>> value.
>>>>>>>
>>>>>>
>>>>>> Yes. It will recalculate for that hour and will update the value.
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> When does one hour counting starts? is it from the moment server
>>>>>>> starts? That will be probabilistic when you restart. I think we need to
>>>>>>> either start with know place ( midnight) or let user configure it.
>>>>>>>
>>>>>>
>>>>>> In the first run all the data available are processed.
>>>>>> After that it calculates the floor of last processed events'
>>>>>> timestamp and gets the floor value (timestamp - timestamp%3600), that 
>>>>>> would
>>>>>> be used as the start of the time windows.
>>>>>>
>>>>>>>
>>>>>>> I am bit concern about the syntax though. This only works for very
>>>>>>> specific type of queries ( that includes aggregate and a group by). What
>>>>>>> happen if user do this with a different query? Can we give clear error
>>>>>>> message?
>>>>>>>
>>>>>>
>>>>>> Currently the error messages are very generic. We will have to work
>>>>>> on it to improve those messages.
>>>>>>
>>>>>> Thanks,
>>>>>> Sachith
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> --Srinath
>>>>>>>
>>>>>>> On Mon, Mar 21, 2016 at 5:15 PM, Sachith Withana <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> We are adding incremental processing capability to Spark in DAS.
>>>>>>>> As the first stage, we added time slicing to Spark execution.
>>>>>>>>
>>>>>>>> Here's a quick introduction into that.
>>>>>>>>
>>>>>>>> *Execution*:
>>>>>>>>
>>>>>>>> In the first run of the script, it will process all the data in the
>>>>>>>> given table and store the last processed event timestamp.
>>>>>>>> Then from the next run onwards it would start processing starting
>>>>>>>> from that stored timestamp.
>>>>>>>>
>>>>>>>> Until the query that contains the data processing part, completes,
>>>>>>>> last processed event timestamp would not be overridden with the new 
>>>>>>>> value.
>>>>>>>> This is to ensure that the data processing for the query wouldn't
>>>>>>>> have to be done again, if the whole query fails.
>>>>>>>> This is ensured by adding a commit query after the main query.
>>>>>>>> Refer to the Syntax section for the example.
>>>>>>>>
>>>>>>>> *Syntax*:
>>>>>>>>
>>>>>>>> In the spark script, incremental processing support has to be
>>>>>>>> specified per table, this would happen in the create temporary table 
>>>>>>>> line.
>>>>>>>>
>>>>>>>> ex: CREATE TEMPORARY TABLE T1 USING CarbonAnalytics options
>>>>>>>> (tableName "test",
>>>>>>>> *incrementalProcessing "T1,3600");*
>>>>>>>>
>>>>>>>> INSERT INTO T2 SELECT username, age GROUP BY age FROM T1;
>>>>>>>>
>>>>>>>> INC_TABLE_COMMIT T1;
>>>>>>>>
>>>>>>>> The last line is where it ensures the processing took place
>>>>>>>> successfully and replaces the lastProcessed timestamp with the new one.
>>>>>>>>
>>>>>>>> *TimeWindow*:
>>>>>>>>
>>>>>>>> To do the incremental processing, the user has to provide the time
>>>>>>>> window per which the data would be processed.
>>>>>>>> In the above example. the data would be summarized in *1 hour *time
>>>>>>>> windows.
>>>>>>>>
>>>>>>>> *WindowOffset*:
>>>>>>>>
>>>>>>>> Events might arrive late that would belong to a previous processed
>>>>>>>> time window. To account to that, we have added an optional parameter 
>>>>>>>> that
>>>>>>>> would allow to
>>>>>>>> process immediately previous time windows as well ( acts like a
>>>>>>>> buffer).
>>>>>>>> ex: If this is set to 1, apart from the to-be-processed data, data
>>>>>>>> related to the previously processed time window will also be taken for
>>>>>>>> processing.
>>>>>>>>
>>>>>>>>
>>>>>>>> *Limitations*:
>>>>>>>>
>>>>>>>> Currently, multiple time windows cannot be specified per temporary
>>>>>>>> table in the same script.
>>>>>>>> It would have to be done using different temporary tables.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *Future Improvements:*
>>>>>>>> - Add aggregation function support for incremental processing
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Sachith
>>>>>>>> --
>>>>>>>> Sachith Withana
>>>>>>>> Software Engineer; WSO2 Inc.; http://wso2.com
>>>>>>>> E-mail: sachith AT wso2.com
>>>>>>>> M: +94715518127
>>>>>>>> Linked-In: <http://goog_416592669>
>>>>>>>> https://lk.linkedin.com/in/sachithwithana
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> ============================
>>>>>>> Srinath Perera, Ph.D.
>>>>>>>    http://people.apache.org/~hemapani/
>>>>>>>    http://srinathsview.blogspot.com/
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Sachith Withana
>>>>>> Software Engineer; WSO2 Inc.; http://wso2.com
>>>>>> E-mail: sachith AT wso2.com
>>>>>> M: +94715518127
>>>>>> Linked-In: <http://goog_416592669>
>>>>>> https://lk.linkedin.com/in/sachithwithana
>>>>>>
>>>>>> _______________________________________________
>>>>>> Architecture mailing list
>>>>>> [email protected]
>>>>>> https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Thanks & Regards,
>>>>>
>>>>> Inosh Goonewardena
>>>>> Associate Technical Lead- WSO2 Inc.
>>>>> Mobile: +94779966317
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Sachith Withana
>>>> Software Engineer; WSO2 Inc.; http://wso2.com
>>>> E-mail: sachith AT wso2.com
>>>> M: +94715518127
>>>> Linked-In: <http://goog_416592669>
>>>> https://lk.linkedin.com/in/sachithwithana
>>>>
>>>
>>>
>>>
>>> --
>>> Thanks & Regards,
>>>
>>> Inosh Goonewardena
>>> Associate Technical Lead- WSO2 Inc.
>>> Mobile: +94779966317
>>>
>>> _______________________________________________
>>> Architecture mailing list
>>> [email protected]
>>> https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture
>>>
>>>
>>
>>
>> --
>> W.G. Gihan Anuruddha
>> Senior Software Engineer | WSO2, Inc.
>> M: +94772272595
>>
>> _______________________________________________
>> Architecture mailing list
>> [email protected]
>> https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture
>>
>>
>
>
> --
> Thanks & Regards,
>
> Inosh Goonewardena
> Associate Technical Lead- WSO2 Inc.
> Mobile: +94779966317
>



-- 
*V. Mohanadarshan*
*Associate Tech Lead,*
*Data Technologies Team,*
*WSO2, Inc. http://wso2.com <http://wso2.com> *
*lean.enterprise.middleware.*

email: [email protected]
phone:(+94) 771117673
_______________________________________________
Architecture mailing list
[email protected]
https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture

Reply via email to