Hi All,

I need to get the current time when executing a siddhi query in CEP 3.0.0.
For that I wrote a custom function called 'now()' as below:

import org.wso2.siddhi.core.config.SiddhiContext;
import org.wso2.siddhi.core.executor.function.FunctionExecutor;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;

@SiddhiExtension(namespace = "stratos", function = "now")
public class TimeWindowProcessor extends FunctionExecutor {
    Attribute.Type returnType = Attribute.Type.LONG;
    @Override
    public void init(Attribute.Type[] types, SiddhiContext siddhiContext) {
    }

    @Override
    protected Object process(Object obj) {
        return System.currentTimeMillis();
    }

    @Override
    public void destroy() {
    }

    @Override
    public Attribute.Type getReturnType() {
        return returnType;
    }
}


Then I tried to use the above function in siddhi query as below:

from avg_rif_stat
   select cluster_id, cluster_instance_id, network_partition_id,
in_flight_request_count,
   stratos:concat(cluster_id, '-' ,cluster_instance_id) as
avg_rif_cluster_network
   insert into avg_rif_concat;
define partition avg_rif_cluster_partition by
avg_rif_concat.avg_rif_cluster_network;
from avg_rif_concat#window.timeBatch(1 min)
   select *stratos:now() as timestamp*, cluster_id,
cluster_instance_id, network_partition_id,
avg(in_flight_request_count) as count
   insert into average_in_flight_requests
   partition by avg_rif_cluster_partition;

When I stratos server startup I'm getting the below exception:

[2015-10-16 14:24:21,940]  INFO
{org.wso2.carbon.event.processor.core.internal.listener.EventReceiverStreamNotificationListenerImpl}
-  Trying to redeploy configuration files for stream:
average_in_flight_requests:1.0.0
[2015-10-16 14:24:21,941] ERROR
{org.wso2.carbon.event.processor.core.internal.CarbonEventProcessorService}
-  Externally defined stream:
StreamDefinition{
        streamId='average_in_flight_requests:1.0.0',
        name='average_in_flight_requests',
        version='1.0.0',
        nickName='average in-flight requests',
        description='average of in-flight request count',
        tags=null,
        metaData=null,
        correlationData=null,
        *payloadData=[Attribute{name='timestamp', type=LONG},
Attribute{name='cluster_id', type=STRING},
Attribute{name='cluster_instance_id', type=STRING},
Attribute{name='network_partition_id', type=STRING},
Attribute{name='count', type=DOUBLE}],*
}
 is different from the existing stream :
StreamDefinition{
        streamId='average_in_flight_requests:1.0.0',
        name='average_in_flight_requests',
        version='1.0.0',
        nickName='null',
        description='null',
        tags=null,
        metaData=null,
        correlationData=null,
        *payloadData=[Attribute{name='timestamp', type=LONG}],*
}

When events are being published following exception is thrown:
[2015-10-16 15:56:57,716] ERROR
{org.wso2.siddhi.core.query.processor.window.TimeBatchWindowProcessor} -
 Index: 1, Size: 1
java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
at java.util.ArrayList.rangeCheck(ArrayList.java:635)
at java.util.ArrayList.get(ArrayList.java:411)
at
org.wso2.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:282)
at
org.wso2.siddhi.core.query.processor.window.TimeBatchWindowProcessor.run(TimeBatchWindowProcessor.java:115)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


I think I have a problem in using the custom function now() in siddhi
query. *If we don't have a parameter to pass in custom method, shall I use
it as now()?*

Please suggest a way to fix this problem.

Thanks.


-- 
Thanuja Uruththirakodeeswaran
Software Engineer
WSO2 Inc.;http://wso2.com
lean.enterprise.middleware

mobile: +94 774363167
_______________________________________________
Dev mailing list
[email protected]
http://wso2.org/cgi-bin/mailman/listinfo/dev

Reply via email to