+WSO2 @DL Could someone eligible to explain?
https://github.com/wso2/siddhi/issues/154 Thanks, Ralph On Fri, Jun 3, 2016 at 8:21 AM, Garrett <notificati...@github.com> wrote: > Has problems with Siddhi group by with a time window, I am able to get the > correct result aggregations for the group, but I receive one aggregated > result per event, not one per group. > > Here is the query: > define stream bootCorrelationStream (logLevel string, message string, > similarityId string, timestamp long, uuid string); @info(name = > 'bootCorrelation') from > bootCorrelationStream[logLevel=='ERROR']#window.externalTime(timestamp, 2 > min) select similarityId, adanos:first(message) as message, min(timestamp) > as startTime, max(timestamp) as endTime group by uuid, similarityId insert > into tempStream; > > Here is the output: > Count: 1; message=ERROR first message, timestamp=2016-05-21 01:22:07.579 > Count: 1; message=ERROR second message, timestamp=2016-05-21 01:22:08.314 > Count: 1; message=ERROR third message, timestamp=2016-05-21 01:22:15.017 > Count: 1; message=ERROR fourth message, timestamp=2016-05-21 01:25:07.017 > > Here is the major code: > register callback > ` > this.siddhiRuntime = new SiddhiRuntimeHolder(); > this.siddhiRuntime.siddhiManager = new SiddhiManager(); > ExecutionPlanRuntime executionPlanRuntime = > this.siddhiRuntime.siddhiManager.createExecutionPlanRuntime(this.getExecutionPlan()); > this.siddhiRuntime.inputHandler = > executionPlanRuntime.getInputHandler(this.getStreamName()); > > final SiddhiBolt siddhiBolt = this; > this.siddhiRuntime.queryCallback = new QueryCallback() { > @Override > public void receive(long timeStamp, > org.wso2.siddhi.core.event.Event[] inEvents, > org.wso2.siddhi.core.event.Event[] removeEvents) { > siddhiBolt.queryCallback(timeStamp, inEvents, removeEvents); > } > }; > executionPlanRuntime.addCallback("query", > this.siddhiRuntime.queryCallback); > executionPlanRuntime.start(); > > > query details > > public static String generateExecutionPlan() { > // make sure the fields' name sorted > StringBuilder executionPlane = new StringBuilder(" define stream > bootCorrelationStream "); > executionPlane.append("(logLevel string, message string, similarityId > string, timestamp long, uuid string); "); > executionPlane.append(" @info(name = 'bootCorrelation') "); > // externalTimeBatch(timestamp, 5 min), batch time window for specified > timestamp > executionPlane.append("from > bootCorrelationStream[logLevel=='ERROR']#window.externalTime(timestamp, 2 > min) "); > executionPlane.append(" select similarityId, adanos:first(message) as > message, uuid, "); > executionPlane.append(" min(timestamp) as startTime, max(timestamp) as > endTime "); > executionPlane.append(" group by uuid, similarityId "); > executionPlane.append(" insert into tempStream; "); > appendLogs(executionPlane.toString()); > return executionPlane.toString(); > } > > callback details > > Map> tempMap = new HashMap>(); > for (Event event : inEvents) { > Object[] data = event.getData(); > BootCorrelationAggregationData aggregateData = new > BootCorrelationAggregationData(); > aggregateData.setSimilarityId((String) data[0]); > aggregateData.setMessage((String) data[1]); > aggregateData.setUuid((String) data[2]); > aggregateData.setTimestamp((Long) data[3]); > if (!tempMap.containsKey(aggregateData.getUuid())) { > tempMap.put(aggregateData.getUuid(), new ArrayList()); > } > tempMap.get(aggregateData.getUuid()).add(aggregateData); > } > > List<BootCorrelationAggregationData> emitList = new > ArrayList<BootCorrelationAggregationData>(); > Iterator<Entry<String, List<BootCorrelationAggregationData>>> it = > tempMap.entrySet().iterator(); > while (it.hasNext()) { > Entry<String, List<BootCorrelationAggregationData>> entry = it.next(); > List<BootCorrelationAggregationData> temp = entry.getValue(); > Collections.sort(temp, new > Comparator<BootCorrelationAggregationData>() { > @Override > public int compare(BootCorrelationAggregationData o1, > BootCorrelationAggregationData o2) { > return (int) (o1.getTimestamp() - o2.getTimestamp()); > } > }); > > if (temp.size() > 0) { > String message = String.format("Count: %s, for %s", temp.size(), > ToStringBuilder.reflectionToString(temp.get(0))); > LOG.info(message); > SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd > HH:mm:ss.SSS"); > appendLogs(String.format("Count: %s; message=%s, timestamp=%s", > temp.size(), temp.get(0).getMessage(), sdf.format(new > Date(temp.get(0).getTimestamp())))); > emitList.add(temp.get(0)); > } > } > > if (emitList.size() > 0) { > this.getOutputCollector().emit(new Values(emitList)); > LOG.info(String.format("BootCorrelationSiddhiBolt emit size: %s", > emitList.size())); > for (BootCorrelationAggregationData data : emitList) { > LOG.info(String.format("BootCorrelationSiddhiBolt emits %s", > ToStringBuilder.reflectionToString(data))); > } > } > > ` > > Input data: > > logLevel="ERROR", message="ERROR first message", timestamp="2016-05-21 > 01:22:07.579", uuid="94350d43-8b6b-4669-8561-2f956b05a341" > logLevel="ERROR", message="ERROR second message", timestamp="2016-05-21 > 01:22:08.314", uuid="94350d43-8b6b-4669-8561-2f956b05a341" > logLevel="ERROR", message="ERROR third message", timestamp="2016-05-21 > 01:22:15.017", uuid="94350d43-8b6b-4669-8561-2f956b05a341" > logLevel="ERROR", message="ERROR fourth message", timestamp="2016-05-21 > 01:25:07.017", uuid="94350d43-8b6b-4669-8561-2f956b05a341" > > — > You are receiving this because you are subscribed to this thread. > Reply to this email directly, view it on GitHub > <https://github.com/wso2/siddhi/issues/154>, or mute the thread > <https://github.com/notifications/unsubscribe/AARjwAtgUjhGUE65W-7sSo1UuebELOvBks5qH3N1gaJpZM4ItGe0> > . >
_______________________________________________ Dev mailing list Dev@wso2.org http://wso2.org/cgi-bin/mailman/listinfo/dev