Garrett might provide more details.

There is format issue in github. Follow the code, basically, the output try
to read the event data into Map of instanceUUid -> List<Log of different
similarity> , then print the count of each instanceUuid.

See

String message = String.format("Count: %s; message=%s, timestamp=%s,
similarityId=%s,





On Fri, Jun 3, 2016 at 2:59 PM, Tishan Dahanayakage <[email protected]> wrote:

> Ralph,
>
> Does following code recide inside your siddhiBolt.queryCallback?
>
> Map> tempMap = new HashMap>();
> for (Event event : inEvents) {
> Object[] data = event.getData();
> BootCorrelationAggregationData aggregateData = new
> BootCorrelationAggregationData();
> aggregateData.setSimilarityId((String) data[0]);
> aggregateData.setMessage((String) data[1]);
> aggregateData.setUuid((String) data[2]);
> aggregateData.setTimestamp((Long) data[3]);
> if (!tempMap.containsKey(aggregateData.getUuid())) {
> tempMap.put(aggregateData.getUuid(), new ArrayList());
> }
> tempMap.get(aggregateData.getUuid()).add(aggregateData);
> }
>
> Thanks
> /Tishan
>
> On Fri, Jun 3, 2016 at 11:47 AM, Liangfei.Su <[email protected]> wrote:
>
>> +Garrett
>>
>> Understand the output based on each input data. The question here is, the
>> count is always 1, i'm expecting count=1/2/3. e.g. when second input
>> message send in, the first input message should still kept in the windows,
>> so expect there should be two groups...
>>
>>
>>
>>
>>
>> On Fri, Jun 3, 2016 at 11:14 AM, Charini Nanayakkara <[email protected]>
>> wrote:
>>
>>> Hi Liangfei.Su,
>>>
>>> This is the expected behavior of external time window. Since it's a
>>> moving time window, you get an output for each incoming event. Due to the
>>> presence of the group by keyword, it would give an aggregate output by
>>> considering the events with the same uuid and similarityId, which arrived
>>> within last two minutes.
>>>
>>> If you need only one output per group use the external time batch window
>>> instead. Ensure that all the events come within the time duration which you
>>> specify here.
>>>
>>> Regards,
>>> Charini
>>>
>>> On Fri, Jun 3, 2016 at 6:16 AM, Liangfei.Su <[email protected]>
>>> wrote:
>>>
>>>> +WSO2 @DL
>>>>
>>>> Could someone eligible to explain?
>>>>
>>>> https://github.com/wso2/siddhi/issues/154
>>>>
>>>>
>>>> Thanks,
>>>> Ralph
>>>>
>>>>
>>>> On Fri, Jun 3, 2016 at 8:21 AM, Garrett <[email protected]>
>>>> wrote:
>>>>
>>>>> Has problems with Siddhi group by with a time window, I am able to get
>>>>> the correct result aggregations for the group, but I receive one 
>>>>> aggregated
>>>>> result per event, not one per group.
>>>>>
>>>>> Here is the query:
>>>>> define stream bootCorrelationStream (logLevel string, message string,
>>>>> similarityId string, timestamp long, uuid string); @info(name =
>>>>> 'bootCorrelation') from
>>>>> bootCorrelationStream[logLevel=='ERROR']#window.externalTime(timestamp, 2
>>>>> min) select similarityId, adanos:first(message) as message, min(timestamp)
>>>>> as startTime, max(timestamp) as endTime group by uuid, similarityId insert
>>>>> into tempStream;
>>>>>
>>>>> Here is the output:
>>>>> Count: 1; message=ERROR first message, timestamp=2016-05-21
>>>>> 01:22:07.579
>>>>> Count: 1; message=ERROR second message, timestamp=2016-05-21
>>>>> 01:22:08.314
>>>>> Count: 1; message=ERROR third message, timestamp=2016-05-21
>>>>> 01:22:15.017
>>>>> Count: 1; message=ERROR fourth message, timestamp=2016-05-21
>>>>> 01:25:07.017
>>>>>
>>>>> Here is the major code:
>>>>> register callback
>>>>> `
>>>>> this.siddhiRuntime = new SiddhiRuntimeHolder();
>>>>> this.siddhiRuntime.siddhiManager = new SiddhiManager();
>>>>> ExecutionPlanRuntime executionPlanRuntime =
>>>>> this.siddhiRuntime.siddhiManager.createExecutionPlanRuntime(this.getExecutionPlan());
>>>>> this.siddhiRuntime.inputHandler =
>>>>> executionPlanRuntime.getInputHandler(this.getStreamName());
>>>>>
>>>>>     final SiddhiBolt siddhiBolt = this;
>>>>>     this.siddhiRuntime.queryCallback = new QueryCallback() {
>>>>>         @Override
>>>>>         public void receive(long timeStamp, 
>>>>> org.wso2.siddhi.core.event.Event[] inEvents,
>>>>>                 org.wso2.siddhi.core.event.Event[] removeEvents) {
>>>>>             siddhiBolt.queryCallback(timeStamp, inEvents, removeEvents);
>>>>>         }
>>>>>     };
>>>>>     executionPlanRuntime.addCallback("query", 
>>>>> this.siddhiRuntime.queryCallback);
>>>>>     executionPlanRuntime.start();
>>>>>
>>>>>
>>>>> query details
>>>>>
>>>>> public static String generateExecutionPlan() {
>>>>> // make sure the fields' name sorted
>>>>> StringBuilder executionPlane = new StringBuilder(" define stream
>>>>> bootCorrelationStream ");
>>>>> executionPlane.append("(logLevel string, message string, similarityId
>>>>> string, timestamp long, uuid string); ");
>>>>> executionPlane.append(" @info(name = 'bootCorrelation') ");
>>>>> // externalTimeBatch(timestamp, 5 min), batch time window for
>>>>> specified timestamp
>>>>> executionPlane.append("from
>>>>> bootCorrelationStream[logLevel=='ERROR']#window.externalTime(timestamp, 2
>>>>> min) ");
>>>>> executionPlane.append(" select similarityId, adanos:first(message) as
>>>>> message, uuid, ");
>>>>> executionPlane.append(" min(timestamp) as startTime, max(timestamp) as
>>>>> endTime ");
>>>>> executionPlane.append(" group by uuid, similarityId ");
>>>>> executionPlane.append(" insert into tempStream; ");
>>>>> appendLogs(executionPlane.toString());
>>>>> return executionPlane.toString();
>>>>> }
>>>>>
>>>>> callback details
>>>>>
>>>>> Map> tempMap = new HashMap>();
>>>>> for (Event event : inEvents) {
>>>>> Object[] data = event.getData();
>>>>> BootCorrelationAggregationData aggregateData = new
>>>>> BootCorrelationAggregationData();
>>>>> aggregateData.setSimilarityId((String) data[0]);
>>>>> aggregateData.setMessage((String) data[1]);
>>>>> aggregateData.setUuid((String) data[2]);
>>>>> aggregateData.setTimestamp((Long) data[3]);
>>>>> if (!tempMap.containsKey(aggregateData.getUuid())) {
>>>>> tempMap.put(aggregateData.getUuid(), new ArrayList());
>>>>> }
>>>>> tempMap.get(aggregateData.getUuid()).add(aggregateData);
>>>>> }
>>>>>
>>>>>     List<BootCorrelationAggregationData> emitList = new 
>>>>> ArrayList<BootCorrelationAggregationData>();
>>>>>     Iterator<Entry<String, List<BootCorrelationAggregationData>>> it = 
>>>>> tempMap.entrySet().iterator();
>>>>>     while (it.hasNext()) {
>>>>>         Entry<String, List<BootCorrelationAggregationData>> entry = 
>>>>> it.next();
>>>>>         List<BootCorrelationAggregationData> temp = entry.getValue();
>>>>>         Collections.sort(temp, new 
>>>>> Comparator<BootCorrelationAggregationData>() {
>>>>>             @Override
>>>>>             public int compare(BootCorrelationAggregationData o1, 
>>>>> BootCorrelationAggregationData o2) {
>>>>>                 return (int) (o1.getTimestamp() - o2.getTimestamp());
>>>>>             }
>>>>>         });
>>>>>
>>>>>         if (temp.size() > 0) {
>>>>>             String message = String.format("Count: %s, for %s", 
>>>>> temp.size(), ToStringBuilder.reflectionToString(temp.get(0)));
>>>>>             LOG.info(message);
>>>>>             SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd 
>>>>> HH:mm:ss.SSS");
>>>>>             appendLogs(String.format("Count: %s; message=%s, 
>>>>> timestamp=%s", temp.size(), temp.get(0).getMessage(), sdf.format(new 
>>>>> Date(temp.get(0).getTimestamp()))));
>>>>>             emitList.add(temp.get(0));
>>>>>         }
>>>>>     }
>>>>>
>>>>>     if (emitList.size() > 0) {
>>>>>         this.getOutputCollector().emit(new Values(emitList));
>>>>>         LOG.info(String.format("BootCorrelationSiddhiBolt emit size: %s", 
>>>>> emitList.size()));
>>>>>         for (BootCorrelationAggregationData data : emitList) {
>>>>>             LOG.info(String.format("BootCorrelationSiddhiBolt emits %s", 
>>>>> ToStringBuilder.reflectionToString(data)));
>>>>>         }
>>>>>     }
>>>>>
>>>>> `
>>>>>
>>>>> Input data:
>>>>>
>>>>> logLevel="ERROR", message="ERROR first message", timestamp="2016-05-21
>>>>> 01:22:07.579", uuid="94350d43-8b6b-4669-8561-2f956b05a341"
>>>>> logLevel="ERROR", message="ERROR second message",
>>>>> timestamp="2016-05-21 01:22:08.314",
>>>>> uuid="94350d43-8b6b-4669-8561-2f956b05a341"
>>>>> logLevel="ERROR", message="ERROR third message", timestamp="2016-05-21
>>>>> 01:22:15.017", uuid="94350d43-8b6b-4669-8561-2f956b05a341"
>>>>> logLevel="ERROR", message="ERROR fourth message",
>>>>> timestamp="2016-05-21 01:25:07.017",
>>>>> uuid="94350d43-8b6b-4669-8561-2f956b05a341"
>>>>>
>>>>> —
>>>>> You are receiving this because you are subscribed to this thread.
>>>>> Reply to this email directly, view it on GitHub
>>>>> <https://github.com/wso2/siddhi/issues/154>, or mute the thread
>>>>> <https://github.com/notifications/unsubscribe/AARjwAtgUjhGUE65W-7sSo1UuebELOvBks5qH3N1gaJpZM4ItGe0>
>>>>> .
>>>>>
>>>>
>>>>
>>>> _______________________________________________
>>>> Dev mailing list
>>>> [email protected]
>>>> http://wso2.org/cgi-bin/mailman/listinfo/dev
>>>>
>>>>
>>>
>>>
>>> --
>>> Charini Vimansha Nanayakkara
>>> Software Engineer at WSO2
>>> Mobile: 0714126293
>>>
>>>
>>
>> _______________________________________________
>> Dev mailing list
>> [email protected]
>> http://wso2.org/cgi-bin/mailman/listinfo/dev
>>
>>
>
>
> --
> Tishan Dahanayakage
> Software Engineer
> WSO2, Inc.
> Mobile:+94 716481328
>
> Disclaimer: This communication may contain privileged or other
> confidential information and is intended exclusively for the addressee/s.
> If you are not the intended recipient/s, or believe that you may have
> received this communication in error, please reply to the sender indicating
> that fact and delete the copy you received and in addition, you should not
> print, copy, re-transmit, disseminate, or otherwise use the information
> contained in this communication. Internet communications cannot be
> guaranteed to be timely, secure, error or virus-free. The sender does not
> accept liability for any errors or omissions.
>
_______________________________________________
Dev mailing list
[email protected]
http://wso2.org/cgi-bin/mailman/listinfo/dev

Reply via email to