Hi Liangfei.Su,

This is the expected behavior of external time window. Since it's a moving
time window, you get an output for each incoming event. Due to the presence
of the group by keyword, it would give an aggregate output by considering
the events with the same uuid and similarityId, which arrived within last
two minutes.

If you need only one output per group use the external time batch window
instead. Ensure that all the events come within the time duration which you
specify here.

Regards,
Charini

On Fri, Jun 3, 2016 at 6:16 AM, Liangfei.Su <[email protected]> wrote:

> +WSO2 @DL
>
> Could someone eligible to explain?
>
> https://github.com/wso2/siddhi/issues/154
>
>
> Thanks,
> Ralph
>
>
> On Fri, Jun 3, 2016 at 8:21 AM, Garrett <[email protected]> wrote:
>
>> Has problems with Siddhi group by with a time window, I am able to get
>> the correct result aggregations for the group, but I receive one aggregated
>> result per event, not one per group.
>>
>> Here is the query:
>> define stream bootCorrelationStream (logLevel string, message string,
>> similarityId string, timestamp long, uuid string); @info(name =
>> 'bootCorrelation') from
>> bootCorrelationStream[logLevel=='ERROR']#window.externalTime(timestamp, 2
>> min) select similarityId, adanos:first(message) as message, min(timestamp)
>> as startTime, max(timestamp) as endTime group by uuid, similarityId insert
>> into tempStream;
>>
>> Here is the output:
>> Count: 1; message=ERROR first message, timestamp=2016-05-21 01:22:07.579
>> Count: 1; message=ERROR second message, timestamp=2016-05-21 01:22:08.314
>> Count: 1; message=ERROR third message, timestamp=2016-05-21 01:22:15.017
>> Count: 1; message=ERROR fourth message, timestamp=2016-05-21 01:25:07.017
>>
>> Here is the major code:
>> register callback
>> `
>> this.siddhiRuntime = new SiddhiRuntimeHolder();
>> this.siddhiRuntime.siddhiManager = new SiddhiManager();
>> ExecutionPlanRuntime executionPlanRuntime =
>> this.siddhiRuntime.siddhiManager.createExecutionPlanRuntime(this.getExecutionPlan());
>> this.siddhiRuntime.inputHandler =
>> executionPlanRuntime.getInputHandler(this.getStreamName());
>>
>>     final SiddhiBolt siddhiBolt = this;
>>     this.siddhiRuntime.queryCallback = new QueryCallback() {
>>         @Override
>>         public void receive(long timeStamp, 
>> org.wso2.siddhi.core.event.Event[] inEvents,
>>                 org.wso2.siddhi.core.event.Event[] removeEvents) {
>>             siddhiBolt.queryCallback(timeStamp, inEvents, removeEvents);
>>         }
>>     };
>>     executionPlanRuntime.addCallback("query", 
>> this.siddhiRuntime.queryCallback);
>>     executionPlanRuntime.start();
>>
>>
>> query details
>>
>> public static String generateExecutionPlan() {
>> // make sure the fields' name sorted
>> StringBuilder executionPlane = new StringBuilder(" define stream
>> bootCorrelationStream ");
>> executionPlane.append("(logLevel string, message string, similarityId
>> string, timestamp long, uuid string); ");
>> executionPlane.append(" @info(name = 'bootCorrelation') ");
>> // externalTimeBatch(timestamp, 5 min), batch time window for specified
>> timestamp
>> executionPlane.append("from
>> bootCorrelationStream[logLevel=='ERROR']#window.externalTime(timestamp, 2
>> min) ");
>> executionPlane.append(" select similarityId, adanos:first(message) as
>> message, uuid, ");
>> executionPlane.append(" min(timestamp) as startTime, max(timestamp) as
>> endTime ");
>> executionPlane.append(" group by uuid, similarityId ");
>> executionPlane.append(" insert into tempStream; ");
>> appendLogs(executionPlane.toString());
>> return executionPlane.toString();
>> }
>>
>> callback details
>>
>> Map> tempMap = new HashMap>();
>> for (Event event : inEvents) {
>> Object[] data = event.getData();
>> BootCorrelationAggregationData aggregateData = new
>> BootCorrelationAggregationData();
>> aggregateData.setSimilarityId((String) data[0]);
>> aggregateData.setMessage((String) data[1]);
>> aggregateData.setUuid((String) data[2]);
>> aggregateData.setTimestamp((Long) data[3]);
>> if (!tempMap.containsKey(aggregateData.getUuid())) {
>> tempMap.put(aggregateData.getUuid(), new ArrayList());
>> }
>> tempMap.get(aggregateData.getUuid()).add(aggregateData);
>> }
>>
>>     List<BootCorrelationAggregationData> emitList = new 
>> ArrayList<BootCorrelationAggregationData>();
>>     Iterator<Entry<String, List<BootCorrelationAggregationData>>> it = 
>> tempMap.entrySet().iterator();
>>     while (it.hasNext()) {
>>         Entry<String, List<BootCorrelationAggregationData>> entry = 
>> it.next();
>>         List<BootCorrelationAggregationData> temp = entry.getValue();
>>         Collections.sort(temp, new 
>> Comparator<BootCorrelationAggregationData>() {
>>             @Override
>>             public int compare(BootCorrelationAggregationData o1, 
>> BootCorrelationAggregationData o2) {
>>                 return (int) (o1.getTimestamp() - o2.getTimestamp());
>>             }
>>         });
>>
>>         if (temp.size() > 0) {
>>             String message = String.format("Count: %s, for %s", temp.size(), 
>> ToStringBuilder.reflectionToString(temp.get(0)));
>>             LOG.info(message);
>>             SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd 
>> HH:mm:ss.SSS");
>>             appendLogs(String.format("Count: %s; message=%s, timestamp=%s", 
>> temp.size(), temp.get(0).getMessage(), sdf.format(new 
>> Date(temp.get(0).getTimestamp()))));
>>             emitList.add(temp.get(0));
>>         }
>>     }
>>
>>     if (emitList.size() > 0) {
>>         this.getOutputCollector().emit(new Values(emitList));
>>         LOG.info(String.format("BootCorrelationSiddhiBolt emit size: %s", 
>> emitList.size()));
>>         for (BootCorrelationAggregationData data : emitList) {
>>             LOG.info(String.format("BootCorrelationSiddhiBolt emits %s", 
>> ToStringBuilder.reflectionToString(data)));
>>         }
>>     }
>>
>> `
>>
>> Input data:
>>
>> logLevel="ERROR", message="ERROR first message", timestamp="2016-05-21
>> 01:22:07.579", uuid="94350d43-8b6b-4669-8561-2f956b05a341"
>> logLevel="ERROR", message="ERROR second message", timestamp="2016-05-21
>> 01:22:08.314", uuid="94350d43-8b6b-4669-8561-2f956b05a341"
>> logLevel="ERROR", message="ERROR third message", timestamp="2016-05-21
>> 01:22:15.017", uuid="94350d43-8b6b-4669-8561-2f956b05a341"
>> logLevel="ERROR", message="ERROR fourth message", timestamp="2016-05-21
>> 01:25:07.017", uuid="94350d43-8b6b-4669-8561-2f956b05a341"
>>
>> —
>> You are receiving this because you are subscribed to this thread.
>> Reply to this email directly, view it on GitHub
>> <https://github.com/wso2/siddhi/issues/154>, or mute the thread
>> <https://github.com/notifications/unsubscribe/AARjwAtgUjhGUE65W-7sSo1UuebELOvBks5qH3N1gaJpZM4ItGe0>
>> .
>>
>
>
> _______________________________________________
> Dev mailing list
> [email protected]
> http://wso2.org/cgi-bin/mailman/listinfo/dev
>
>


-- 
Charini Vimansha Nanayakkara
Software Engineer at WSO2
Mobile: 0714126293
_______________________________________________
Dev mailing list
[email protected]
http://wso2.org/cgi-bin/mailman/listinfo/dev

Reply via email to