package cep;

import java.util.Arrays;

import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.config.SiddhiConfiguration;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.core.stream.output.StreamCallback;



public class SiddhiQuery1 {

	public static void main(String[] args) throws Exception{
		SiddhiConfiguration configuration = new SiddhiConfiguration();
		
        SiddhiManager siddhiManager1 = new SiddhiManager(configuration);
        

        InputHandler stream1Handle1 = siddhiManager1
        		.defineStream("define stream InstanceHealth ( domain string, ts long, action string, ip string)");
        InputHandler stream1Handle2 =siddhiManager1
        		.defineStream("define stream StratosHealth ( domain string, ts long, connections int)");
        siddhiManager1.defineTable("define table liveInstances (ip string, domain string);");
        
        InputHandler ts = siddhiManager1.defineStream("define stream temp ( t string)");

        
        //siddhiManager1.addQuery("from StratosHealth[connections>300] select connections insert into Actions");

        siddhiManager1.addQuery("from InstanceHealth[action=='up'] select ip as ip, domain as domain insert into liveInstances");
        siddhiManager1.addQuery("from InstanceHealth[action=='down'] delete liveInstances on ip==InstanceHealth.ip");

//        siddhiManager1.addQuery("from StratosHealth#window.length(0)  as h unidirectional join liveInstances as ls "
        siddhiManager1.addQuery("from StratosHealth#window.length(0)  as h join liveInstances as ls "
        		+ "on h.domain==ls.domain "
        		+ "select count(ls.ip) as instanceCount, h.connections as connections group by ls.domain "
        		+ "insert into AutoscaleData for current-events");
        
        siddhiManager1.addQuery("from AutoscaleData[connections >300*instanceCount] select connections insert into Actions");

        siddhiManager1.addCallback("Actions", new StreamCallback() {
			@Override
			public void receive(Event[] events) {
				System.out.println("Actions:"+Arrays.toString(events));
			}
		});
        
        siddhiManager1.addQuery("from temp unidirectional join liveInstances select ip as ip, domain as domain insert into tStream");
        siddhiManager1.addCallback("tStream", new StreamCallback() {
			@Override
			public void receive(Event[] events) {
				System.out.println("log:"+Arrays.toString(events));
			}
		});
        
        siddhiManager1.addCallback("AutoscaleData", new StreamCallback() {
			@Override
			public void receive(Event[] events) {
				System.out.println("AutoscaleData:"+Arrays.toString(events));
			}
		});

        
        
        stream1Handle1.send(new Object[]{"qsb", System.currentTimeMillis(), "up", "192.121.0.1"});
        stream1Handle1.send(new Object[]{"qsb", System.currentTimeMillis(), "up", "192.121.0.2"});

        //stream1Handle1.send(new Object[]{"qsb", System.currentTimeMillis(), "down", "192.121.0.1"});

        stream1Handle2.send(new Object[]{"qsb", System.currentTimeMillis(), 400});
        System.out.println("============1=============");
        stream1Handle2.send(new Object[]{"qsb", System.currentTimeMillis(), 500});
        System.out.println("============2=============");
        stream1Handle2.send(new Object[]{"qsb", System.currentTimeMillis(), 600});
        ts.send(new Object[]{"1"});

        siddhiManager1.shutdown();
    }

}
