+Garrett Understand the output based on each input data. The question here is, the count is always 1, i'm expecting count=1/2/3. e.g. when second input message send in, the first input message should still kept in the windows, so expect there should be two groups...
On Fri, Jun 3, 2016 at 11:14 AM, Charini Nanayakkara <[email protected]> wrote: > Hi Liangfei.Su, > > This is the expected behavior of external time window. Since it's a moving > time window, you get an output for each incoming event. Due to the presence > of the group by keyword, it would give an aggregate output by considering > the events with the same uuid and similarityId, which arrived within last > two minutes. > > If you need only one output per group use the external time batch window > instead. Ensure that all the events come within the time duration which you > specify here. > > Regards, > Charini > > On Fri, Jun 3, 2016 at 6:16 AM, Liangfei.Su <[email protected]> wrote: > >> +WSO2 @DL >> >> Could someone eligible to explain? >> >> https://github.com/wso2/siddhi/issues/154 >> >> >> Thanks, >> Ralph >> >> >> On Fri, Jun 3, 2016 at 8:21 AM, Garrett <[email protected]> wrote: >> >>> Has problems with Siddhi group by with a time window, I am able to get >>> the correct result aggregations for the group, but I receive one aggregated >>> result per event, not one per group. >>> >>> Here is the query: >>> define stream bootCorrelationStream (logLevel string, message string, >>> similarityId string, timestamp long, uuid string); @info(name = >>> 'bootCorrelation') from >>> bootCorrelationStream[logLevel=='ERROR']#window.externalTime(timestamp, 2 >>> min) select similarityId, adanos:first(message) as message, min(timestamp) >>> as startTime, max(timestamp) as endTime group by uuid, similarityId insert >>> into tempStream; >>> >>> Here is the output: >>> Count: 1; message=ERROR first message, timestamp=2016-05-21 01:22:07.579 >>> Count: 1; message=ERROR second message, timestamp=2016-05-21 01:22:08.314 >>> Count: 1; message=ERROR third message, timestamp=2016-05-21 01:22:15.017 >>> Count: 1; message=ERROR fourth message, timestamp=2016-05-21 01:25:07.017 >>> >>> Here is the major code: >>> register callback >>> ` >>> this.siddhiRuntime = new SiddhiRuntimeHolder(); >>> this.siddhiRuntime.siddhiManager = new SiddhiManager(); >>> ExecutionPlanRuntime executionPlanRuntime = >>> this.siddhiRuntime.siddhiManager.createExecutionPlanRuntime(this.getExecutionPlan()); >>> this.siddhiRuntime.inputHandler = >>> executionPlanRuntime.getInputHandler(this.getStreamName()); >>> >>> final SiddhiBolt siddhiBolt = this; >>> this.siddhiRuntime.queryCallback = new QueryCallback() { >>> @Override >>> public void receive(long timeStamp, >>> org.wso2.siddhi.core.event.Event[] inEvents, >>> org.wso2.siddhi.core.event.Event[] removeEvents) { >>> siddhiBolt.queryCallback(timeStamp, inEvents, removeEvents); >>> } >>> }; >>> executionPlanRuntime.addCallback("query", >>> this.siddhiRuntime.queryCallback); >>> executionPlanRuntime.start(); >>> >>> >>> query details >>> >>> public static String generateExecutionPlan() { >>> // make sure the fields' name sorted >>> StringBuilder executionPlane = new StringBuilder(" define stream >>> bootCorrelationStream "); >>> executionPlane.append("(logLevel string, message string, similarityId >>> string, timestamp long, uuid string); "); >>> executionPlane.append(" @info(name = 'bootCorrelation') "); >>> // externalTimeBatch(timestamp, 5 min), batch time window for specified >>> timestamp >>> executionPlane.append("from >>> bootCorrelationStream[logLevel=='ERROR']#window.externalTime(timestamp, 2 >>> min) "); >>> executionPlane.append(" select similarityId, adanos:first(message) as >>> message, uuid, "); >>> executionPlane.append(" min(timestamp) as startTime, max(timestamp) as >>> endTime "); >>> executionPlane.append(" group by uuid, similarityId "); >>> executionPlane.append(" insert into tempStream; "); >>> appendLogs(executionPlane.toString()); >>> return executionPlane.toString(); >>> } >>> >>> callback details >>> >>> Map> tempMap = new HashMap>(); >>> for (Event event : inEvents) { >>> Object[] data = event.getData(); >>> BootCorrelationAggregationData aggregateData = new >>> BootCorrelationAggregationData(); >>> aggregateData.setSimilarityId((String) data[0]); >>> aggregateData.setMessage((String) data[1]); >>> aggregateData.setUuid((String) data[2]); >>> aggregateData.setTimestamp((Long) data[3]); >>> if (!tempMap.containsKey(aggregateData.getUuid())) { >>> tempMap.put(aggregateData.getUuid(), new ArrayList()); >>> } >>> tempMap.get(aggregateData.getUuid()).add(aggregateData); >>> } >>> >>> List<BootCorrelationAggregationData> emitList = new >>> ArrayList<BootCorrelationAggregationData>(); >>> Iterator<Entry<String, List<BootCorrelationAggregationData>>> it = >>> tempMap.entrySet().iterator(); >>> while (it.hasNext()) { >>> Entry<String, List<BootCorrelationAggregationData>> entry = >>> it.next(); >>> List<BootCorrelationAggregationData> temp = entry.getValue(); >>> Collections.sort(temp, new >>> Comparator<BootCorrelationAggregationData>() { >>> @Override >>> public int compare(BootCorrelationAggregationData o1, >>> BootCorrelationAggregationData o2) { >>> return (int) (o1.getTimestamp() - o2.getTimestamp()); >>> } >>> }); >>> >>> if (temp.size() > 0) { >>> String message = String.format("Count: %s, for %s", >>> temp.size(), ToStringBuilder.reflectionToString(temp.get(0))); >>> LOG.info(message); >>> SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd >>> HH:mm:ss.SSS"); >>> appendLogs(String.format("Count: %s; message=%s, timestamp=%s", >>> temp.size(), temp.get(0).getMessage(), sdf.format(new >>> Date(temp.get(0).getTimestamp())))); >>> emitList.add(temp.get(0)); >>> } >>> } >>> >>> if (emitList.size() > 0) { >>> this.getOutputCollector().emit(new Values(emitList)); >>> LOG.info(String.format("BootCorrelationSiddhiBolt emit size: %s", >>> emitList.size())); >>> for (BootCorrelationAggregationData data : emitList) { >>> LOG.info(String.format("BootCorrelationSiddhiBolt emits %s", >>> ToStringBuilder.reflectionToString(data))); >>> } >>> } >>> >>> ` >>> >>> Input data: >>> >>> logLevel="ERROR", message="ERROR first message", timestamp="2016-05-21 >>> 01:22:07.579", uuid="94350d43-8b6b-4669-8561-2f956b05a341" >>> logLevel="ERROR", message="ERROR second message", timestamp="2016-05-21 >>> 01:22:08.314", uuid="94350d43-8b6b-4669-8561-2f956b05a341" >>> logLevel="ERROR", message="ERROR third message", timestamp="2016-05-21 >>> 01:22:15.017", uuid="94350d43-8b6b-4669-8561-2f956b05a341" >>> logLevel="ERROR", message="ERROR fourth message", timestamp="2016-05-21 >>> 01:25:07.017", uuid="94350d43-8b6b-4669-8561-2f956b05a341" >>> >>> — >>> You are receiving this because you are subscribed to this thread. >>> Reply to this email directly, view it on GitHub >>> <https://github.com/wso2/siddhi/issues/154>, or mute the thread >>> <https://github.com/notifications/unsubscribe/AARjwAtgUjhGUE65W-7sSo1UuebELOvBks5qH3N1gaJpZM4ItGe0> >>> . >>> >> >> >> _______________________________________________ >> Dev mailing list >> [email protected] >> http://wso2.org/cgi-bin/mailman/listinfo/dev >> >> > > > -- > Charini Vimansha Nanayakkara > Software Engineer at WSO2 > Mobile: 0714126293 > >
_______________________________________________ Dev mailing list [email protected] http://wso2.org/cgi-bin/mailman/listinfo/dev
