+WSO2 @DL

Could someone eligible to explain?

https://github.com/wso2/siddhi/issues/154


Thanks,
Ralph


On Fri, Jun 3, 2016 at 8:21 AM, Garrett <notificati...@github.com> wrote:

> Has problems with Siddhi group by with a time window, I am able to get the
> correct result aggregations for the group, but I receive one aggregated
> result per event, not one per group.
>
> Here is the query:
> define stream bootCorrelationStream (logLevel string, message string,
> similarityId string, timestamp long, uuid string); @info(name =
> 'bootCorrelation') from
> bootCorrelationStream[logLevel=='ERROR']#window.externalTime(timestamp, 2
> min) select similarityId, adanos:first(message) as message, min(timestamp)
> as startTime, max(timestamp) as endTime group by uuid, similarityId insert
> into tempStream;
>
> Here is the output:
> Count: 1; message=ERROR first message, timestamp=2016-05-21 01:22:07.579
> Count: 1; message=ERROR second message, timestamp=2016-05-21 01:22:08.314
> Count: 1; message=ERROR third message, timestamp=2016-05-21 01:22:15.017
> Count: 1; message=ERROR fourth message, timestamp=2016-05-21 01:25:07.017
>
> Here is the major code:
> register callback
> `
> this.siddhiRuntime = new SiddhiRuntimeHolder();
> this.siddhiRuntime.siddhiManager = new SiddhiManager();
> ExecutionPlanRuntime executionPlanRuntime =
> this.siddhiRuntime.siddhiManager.createExecutionPlanRuntime(this.getExecutionPlan());
> this.siddhiRuntime.inputHandler =
> executionPlanRuntime.getInputHandler(this.getStreamName());
>
>     final SiddhiBolt siddhiBolt = this;
>     this.siddhiRuntime.queryCallback = new QueryCallback() {
>         @Override
>         public void receive(long timeStamp, 
> org.wso2.siddhi.core.event.Event[] inEvents,
>                 org.wso2.siddhi.core.event.Event[] removeEvents) {
>             siddhiBolt.queryCallback(timeStamp, inEvents, removeEvents);
>         }
>     };
>     executionPlanRuntime.addCallback("query", 
> this.siddhiRuntime.queryCallback);
>     executionPlanRuntime.start();
>
>
> query details
>
> public static String generateExecutionPlan() {
> // make sure the fields' name sorted
> StringBuilder executionPlane = new StringBuilder(" define stream
> bootCorrelationStream ");
> executionPlane.append("(logLevel string, message string, similarityId
> string, timestamp long, uuid string); ");
> executionPlane.append(" @info(name = 'bootCorrelation') ");
> // externalTimeBatch(timestamp, 5 min), batch time window for specified
> timestamp
> executionPlane.append("from
> bootCorrelationStream[logLevel=='ERROR']#window.externalTime(timestamp, 2
> min) ");
> executionPlane.append(" select similarityId, adanos:first(message) as
> message, uuid, ");
> executionPlane.append(" min(timestamp) as startTime, max(timestamp) as
> endTime ");
> executionPlane.append(" group by uuid, similarityId ");
> executionPlane.append(" insert into tempStream; ");
> appendLogs(executionPlane.toString());
> return executionPlane.toString();
> }
>
> callback details
>
> Map> tempMap = new HashMap>();
> for (Event event : inEvents) {
> Object[] data = event.getData();
> BootCorrelationAggregationData aggregateData = new
> BootCorrelationAggregationData();
> aggregateData.setSimilarityId((String) data[0]);
> aggregateData.setMessage((String) data[1]);
> aggregateData.setUuid((String) data[2]);
> aggregateData.setTimestamp((Long) data[3]);
> if (!tempMap.containsKey(aggregateData.getUuid())) {
> tempMap.put(aggregateData.getUuid(), new ArrayList());
> }
> tempMap.get(aggregateData.getUuid()).add(aggregateData);
> }
>
>     List<BootCorrelationAggregationData> emitList = new 
> ArrayList<BootCorrelationAggregationData>();
>     Iterator<Entry<String, List<BootCorrelationAggregationData>>> it = 
> tempMap.entrySet().iterator();
>     while (it.hasNext()) {
>         Entry<String, List<BootCorrelationAggregationData>> entry = it.next();
>         List<BootCorrelationAggregationData> temp = entry.getValue();
>         Collections.sort(temp, new 
> Comparator<BootCorrelationAggregationData>() {
>             @Override
>             public int compare(BootCorrelationAggregationData o1, 
> BootCorrelationAggregationData o2) {
>                 return (int) (o1.getTimestamp() - o2.getTimestamp());
>             }
>         });
>
>         if (temp.size() > 0) {
>             String message = String.format("Count: %s, for %s", temp.size(), 
> ToStringBuilder.reflectionToString(temp.get(0)));
>             LOG.info(message);
>             SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd 
> HH:mm:ss.SSS");
>             appendLogs(String.format("Count: %s; message=%s, timestamp=%s", 
> temp.size(), temp.get(0).getMessage(), sdf.format(new 
> Date(temp.get(0).getTimestamp()))));
>             emitList.add(temp.get(0));
>         }
>     }
>
>     if (emitList.size() > 0) {
>         this.getOutputCollector().emit(new Values(emitList));
>         LOG.info(String.format("BootCorrelationSiddhiBolt emit size: %s", 
> emitList.size()));
>         for (BootCorrelationAggregationData data : emitList) {
>             LOG.info(String.format("BootCorrelationSiddhiBolt emits %s", 
> ToStringBuilder.reflectionToString(data)));
>         }
>     }
>
> `
>
> Input data:
>
> logLevel="ERROR", message="ERROR first message", timestamp="2016-05-21
> 01:22:07.579", uuid="94350d43-8b6b-4669-8561-2f956b05a341"
> logLevel="ERROR", message="ERROR second message", timestamp="2016-05-21
> 01:22:08.314", uuid="94350d43-8b6b-4669-8561-2f956b05a341"
> logLevel="ERROR", message="ERROR third message", timestamp="2016-05-21
> 01:22:15.017", uuid="94350d43-8b6b-4669-8561-2f956b05a341"
> logLevel="ERROR", message="ERROR fourth message", timestamp="2016-05-21
> 01:25:07.017", uuid="94350d43-8b6b-4669-8561-2f956b05a341"
>
> —
> You are receiving this because you are subscribed to this thread.
> Reply to this email directly, view it on GitHub
> <https://github.com/wso2/siddhi/issues/154>, or mute the thread
> <https://github.com/notifications/unsubscribe/AARjwAtgUjhGUE65W-7sSo1UuebELOvBks5qH3N1gaJpZM4ItGe0>
> .
>
_______________________________________________
Dev mailing list
Dev@wso2.org
http://wso2.org/cgi-bin/mailman/listinfo/dev

Reply via email to