+Garrett

Understand the output based on each input data. The question here is, the
count is always 1, i'm expecting count=1/2/3. e.g. when second input
message send in, the first input message should still kept in the windows,
so expect there should be two groups...





On Fri, Jun 3, 2016 at 11:14 AM, Charini Nanayakkara <[email protected]>
wrote:

> Hi Liangfei.Su,
>
> This is the expected behavior of external time window. Since it's a moving
> time window, you get an output for each incoming event. Due to the presence
> of the group by keyword, it would give an aggregate output by considering
> the events with the same uuid and similarityId, which arrived within last
> two minutes.
>
> If you need only one output per group use the external time batch window
> instead. Ensure that all the events come within the time duration which you
> specify here.
>
> Regards,
> Charini
>
> On Fri, Jun 3, 2016 at 6:16 AM, Liangfei.Su <[email protected]> wrote:
>
>> +WSO2 @DL
>>
>> Could someone eligible to explain?
>>
>> https://github.com/wso2/siddhi/issues/154
>>
>>
>> Thanks,
>> Ralph
>>
>>
>> On Fri, Jun 3, 2016 at 8:21 AM, Garrett <[email protected]> wrote:
>>
>>> Has problems with Siddhi group by with a time window, I am able to get
>>> the correct result aggregations for the group, but I receive one aggregated
>>> result per event, not one per group.
>>>
>>> Here is the query:
>>> define stream bootCorrelationStream (logLevel string, message string,
>>> similarityId string, timestamp long, uuid string); @info(name =
>>> 'bootCorrelation') from
>>> bootCorrelationStream[logLevel=='ERROR']#window.externalTime(timestamp, 2
>>> min) select similarityId, adanos:first(message) as message, min(timestamp)
>>> as startTime, max(timestamp) as endTime group by uuid, similarityId insert
>>> into tempStream;
>>>
>>> Here is the output:
>>> Count: 1; message=ERROR first message, timestamp=2016-05-21 01:22:07.579
>>> Count: 1; message=ERROR second message, timestamp=2016-05-21 01:22:08.314
>>> Count: 1; message=ERROR third message, timestamp=2016-05-21 01:22:15.017
>>> Count: 1; message=ERROR fourth message, timestamp=2016-05-21 01:25:07.017
>>>
>>> Here is the major code:
>>> register callback
>>> `
>>> this.siddhiRuntime = new SiddhiRuntimeHolder();
>>> this.siddhiRuntime.siddhiManager = new SiddhiManager();
>>> ExecutionPlanRuntime executionPlanRuntime =
>>> this.siddhiRuntime.siddhiManager.createExecutionPlanRuntime(this.getExecutionPlan());
>>> this.siddhiRuntime.inputHandler =
>>> executionPlanRuntime.getInputHandler(this.getStreamName());
>>>
>>>     final SiddhiBolt siddhiBolt = this;
>>>     this.siddhiRuntime.queryCallback = new QueryCallback() {
>>>         @Override
>>>         public void receive(long timeStamp, 
>>> org.wso2.siddhi.core.event.Event[] inEvents,
>>>                 org.wso2.siddhi.core.event.Event[] removeEvents) {
>>>             siddhiBolt.queryCallback(timeStamp, inEvents, removeEvents);
>>>         }
>>>     };
>>>     executionPlanRuntime.addCallback("query", 
>>> this.siddhiRuntime.queryCallback);
>>>     executionPlanRuntime.start();
>>>
>>>
>>> query details
>>>
>>> public static String generateExecutionPlan() {
>>> // make sure the fields' name sorted
>>> StringBuilder executionPlane = new StringBuilder(" define stream
>>> bootCorrelationStream ");
>>> executionPlane.append("(logLevel string, message string, similarityId
>>> string, timestamp long, uuid string); ");
>>> executionPlane.append(" @info(name = 'bootCorrelation') ");
>>> // externalTimeBatch(timestamp, 5 min), batch time window for specified
>>> timestamp
>>> executionPlane.append("from
>>> bootCorrelationStream[logLevel=='ERROR']#window.externalTime(timestamp, 2
>>> min) ");
>>> executionPlane.append(" select similarityId, adanos:first(message) as
>>> message, uuid, ");
>>> executionPlane.append(" min(timestamp) as startTime, max(timestamp) as
>>> endTime ");
>>> executionPlane.append(" group by uuid, similarityId ");
>>> executionPlane.append(" insert into tempStream; ");
>>> appendLogs(executionPlane.toString());
>>> return executionPlane.toString();
>>> }
>>>
>>> callback details
>>>
>>> Map> tempMap = new HashMap>();
>>> for (Event event : inEvents) {
>>> Object[] data = event.getData();
>>> BootCorrelationAggregationData aggregateData = new
>>> BootCorrelationAggregationData();
>>> aggregateData.setSimilarityId((String) data[0]);
>>> aggregateData.setMessage((String) data[1]);
>>> aggregateData.setUuid((String) data[2]);
>>> aggregateData.setTimestamp((Long) data[3]);
>>> if (!tempMap.containsKey(aggregateData.getUuid())) {
>>> tempMap.put(aggregateData.getUuid(), new ArrayList());
>>> }
>>> tempMap.get(aggregateData.getUuid()).add(aggregateData);
>>> }
>>>
>>>     List<BootCorrelationAggregationData> emitList = new 
>>> ArrayList<BootCorrelationAggregationData>();
>>>     Iterator<Entry<String, List<BootCorrelationAggregationData>>> it = 
>>> tempMap.entrySet().iterator();
>>>     while (it.hasNext()) {
>>>         Entry<String, List<BootCorrelationAggregationData>> entry = 
>>> it.next();
>>>         List<BootCorrelationAggregationData> temp = entry.getValue();
>>>         Collections.sort(temp, new 
>>> Comparator<BootCorrelationAggregationData>() {
>>>             @Override
>>>             public int compare(BootCorrelationAggregationData o1, 
>>> BootCorrelationAggregationData o2) {
>>>                 return (int) (o1.getTimestamp() - o2.getTimestamp());
>>>             }
>>>         });
>>>
>>>         if (temp.size() > 0) {
>>>             String message = String.format("Count: %s, for %s", 
>>> temp.size(), ToStringBuilder.reflectionToString(temp.get(0)));
>>>             LOG.info(message);
>>>             SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd 
>>> HH:mm:ss.SSS");
>>>             appendLogs(String.format("Count: %s; message=%s, timestamp=%s", 
>>> temp.size(), temp.get(0).getMessage(), sdf.format(new 
>>> Date(temp.get(0).getTimestamp()))));
>>>             emitList.add(temp.get(0));
>>>         }
>>>     }
>>>
>>>     if (emitList.size() > 0) {
>>>         this.getOutputCollector().emit(new Values(emitList));
>>>         LOG.info(String.format("BootCorrelationSiddhiBolt emit size: %s", 
>>> emitList.size()));
>>>         for (BootCorrelationAggregationData data : emitList) {
>>>             LOG.info(String.format("BootCorrelationSiddhiBolt emits %s", 
>>> ToStringBuilder.reflectionToString(data)));
>>>         }
>>>     }
>>>
>>> `
>>>
>>> Input data:
>>>
>>> logLevel="ERROR", message="ERROR first message", timestamp="2016-05-21
>>> 01:22:07.579", uuid="94350d43-8b6b-4669-8561-2f956b05a341"
>>> logLevel="ERROR", message="ERROR second message", timestamp="2016-05-21
>>> 01:22:08.314", uuid="94350d43-8b6b-4669-8561-2f956b05a341"
>>> logLevel="ERROR", message="ERROR third message", timestamp="2016-05-21
>>> 01:22:15.017", uuid="94350d43-8b6b-4669-8561-2f956b05a341"
>>> logLevel="ERROR", message="ERROR fourth message", timestamp="2016-05-21
>>> 01:25:07.017", uuid="94350d43-8b6b-4669-8561-2f956b05a341"
>>>
>>> —
>>> You are receiving this because you are subscribed to this thread.
>>> Reply to this email directly, view it on GitHub
>>> <https://github.com/wso2/siddhi/issues/154>, or mute the thread
>>> <https://github.com/notifications/unsubscribe/AARjwAtgUjhGUE65W-7sSo1UuebELOvBks5qH3N1gaJpZM4ItGe0>
>>> .
>>>
>>
>>
>> _______________________________________________
>> Dev mailing list
>> [email protected]
>> http://wso2.org/cgi-bin/mailman/listinfo/dev
>>
>>
>
>
> --
> Charini Vimansha Nanayakkara
> Software Engineer at WSO2
> Mobile: 0714126293
>
>
_______________________________________________
Dev mailing list
[email protected]
http://wso2.org/cgi-bin/mailman/listinfo/dev

Reply via email to