http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala deleted file mode 100644 index 4339e5a..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala +++ /dev/null @@ -1,104 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.datastream - -import com.typesafe.config.{Config, ConfigFactory} -import org.apache.eagle.datastream.storm.StormExecutionEnvironment - -object testStreamUnionExpansion extends App{ - val config : Config = ConfigFactory.load - val env = new StormExecutionEnvironment(config) - val tail1 = env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a)) - val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key1",a)) - tail1.streamUnion(List(tail2)).map1(a => "xyz") -} - -object testStreamGroupbyExpansion extends App{ - val config : Config = ConfigFactory.load; - val env = new StormExecutionEnvironment(config) - env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).groupBy(1).map2(a => ("key1",a)) - //env.execute -} - -object testStreamUnionAndGroupbyExpansion extends App{ - val config : Config = ConfigFactory.load; - val env = new StormExecutionEnvironment(config) - val tail1 = env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a)).groupBy(1) - val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key1",a)).groupBy(0) - tail1.streamUnion(List(tail2)).map1(a => "xyz") -// env.execute() -} - -/** - * 1. stream schema - * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s1","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]' - * 2. policy - * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 [(str:regexp(word,'\'.*test.*==true)] select * insert into outputStream ;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]' - */ -object testAlertExpansion extends App{ - val config : Config = ConfigFactory.load; - val env = new StormExecutionEnvironment(config) - val tail1 = env.fromSpout(TestSpout()).nameAs("testSpout1") - .flatMap(WordPrependForAlertExecutor("test")).nameAs("prepend") - .alertWithConsumer("s1", "alert1") - //env.execute -} - -/** - * 1. stream schema - * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s1","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]' - * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s2","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]' - * 2. policy - * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 [(str:regexp(word,'\'.*test.*\)==true)] select * insert into outputStream ;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]' - */ -object testAlertExpansionWithUnion extends App{ - val config : Config = ConfigFactory.load; - val env = new StormExecutionEnvironment(config) - val tail1 = env.fromSpout(TestSpout()).nameAs("testSpout1").flatMap(WordPrependForAlertExecutor("test")).nameAs("prepend") //.map2(a => ("key1",a)) - val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")) //.map2(a => ("key1",a)) - tail1.streamUnion(List(tail2)).alert(Seq("s1","s2"), "alert1", consume = true) - //env.execute -} - - -object testStreamUnionExpansionWithSharedSpout extends App{ - val config : Config = ConfigFactory.load; - val env = new StormExecutionEnvironment(config) - val source = env.fromSpout(TestSpout()) - val tail1 = source.flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a)) - val tail2 = source.flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key1",a)) - tail1.streamUnion(List(tail2)).map1(a => { - println(a) - "xyz" - }) -// env.execute -} - -object testStreamUnionExpansionWithSharedSpout_2 extends App{ - val config : Config = ConfigFactory.load; - val env = new StormExecutionEnvironment(config) - val source = env.fromSpout(TestSpout()) - val tail1 = source.flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a)) - source.streamUnion(List(tail1)).map1(a => { - println(a) - "xyz" - }) -// env.execute -}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala deleted file mode 100644 index ccd3deb..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream - -import com.typesafe.config.ConfigFactory -import org.apache.eagle.datastream.storm.StormExecutionEnvironment - -/** - * @since 12/5/15 - */ -object TestExecutionEnvironment extends App{ - val env0 = ExecutionEnvironments.get[StormExecutionEnvironment] - println(env0) - val config = ConfigFactory.load() - val env1 = ExecutionEnvironments.getWithConfig[StormExecutionEnvironment](config) - println(env1) - val env2 = ExecutionEnvironments.get[StormExecutionEnvironment](Array[String]("-D","key=value")) - println(env2) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestScala.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestScala.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestScala.scala deleted file mode 100644 index d785689..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestScala.scala +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream - -trait A { val x: Int } -case class B(val x: Int, y: Int) extends A \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormNodes.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormNodes.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormNodes.scala deleted file mode 100644 index 231ebab..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormNodes.scala +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream - -import java.util - -import backtype.storm.spout.SpoutOutputCollector -import backtype.storm.task.TopologyContext -import backtype.storm.topology.OutputFieldsDeclarer -import backtype.storm.topology.base.BaseRichSpout -import backtype.storm.tuple.Fields -import com.typesafe.config.Config -import org.slf4j.LoggerFactory - -case class TestSpout() extends BaseRichSpout { - val LOG = LoggerFactory.getLogger(TestSpout.getClass) - var _collector : SpoutOutputCollector = null - override def nextTuple : Unit = { - _collector.emit(util.Arrays.asList("abc")) - LOG.info("send spout data abc") - Thread.sleep(1000) - } - override def declareOutputFields (declarer: OutputFieldsDeclarer): Unit ={ - declarer.declare(new Fields("value")) - } - override def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit ={ - _collector = collector - } -} - -case class TestKeyValueSpout() extends BaseRichSpout { - val LOG = LoggerFactory.getLogger(TestSpout.getClass) - var _collector : SpoutOutputCollector = null - var count : Int = 0 - override def nextTuple : Unit = { - if(count%3 == 0) { - _collector.emit(util.Arrays.asList("abc", new Integer(1))) - }else{ - _collector.emit(util.Arrays.asList("xyz", new Integer(1))) - } - count += 1; - LOG.info("send spout data abc/xyz") - Thread.sleep(1000) - } - override def declareOutputFields (declarer: OutputFieldsDeclarer): Unit ={ - declarer.declare(new Fields("word", "count")) - } - override def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit ={ - _collector = collector - } -} - -case class EchoExecutor() extends StormStreamExecutor1[String] { - val LOG = LoggerFactory.getLogger(EchoExecutor.getClass) - var config : Config = null - override def prepareConfig(config : Config){this.config = config} - override def init {} - override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[String]]): Unit ={ - outputCollector.collect(Tuple1(input.head.asInstanceOf[String])) - LOG.info("echo " + input.head) - } -} - -case class WordPrependExecutor(prefix : String) extends StormStreamExecutor1[String] { - val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass) - var config : Config = null - override def prepareConfig(config : Config){this.config = config} - override def init {} - override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[String]]): Unit ={ - outputCollector.collect(Tuple1(prefix + "_" + input.head)) - LOG.info("preappend " + prefix + "_" + input.head) - } -} - -case class WordPrependForAlertExecutor(prefix : String) extends StormStreamExecutor2[String, util.SortedMap[Object, Object]] { - val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass) - var config : Config = null - override def prepareConfig(config : Config){this.config = config} - override def init {} - override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple2[String, util.SortedMap[Object, Object]]]): Unit ={ - val value = new util.TreeMap[Object, Object]() - value.put("word", prefix + "_" + input.head) - outputCollector.collect(Tuple2("key1",value)) - LOG.info("preappend " + prefix + "_" + input.head) - } -} - -case class WordPrependForAlertExecutor2(prefix : String) extends StormStreamExecutor1[util.SortedMap[Object, Object]] { - val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass) - var config : Config = null - override def prepareConfig(config : Config){this.config = config} - override def init {} - override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[util.SortedMap[Object, Object]]]): Unit ={ - val value = new util.TreeMap[Object, Object]() - value.put("word", prefix + "_" + input.head) - outputCollector.collect(Tuple1(value)) - LOG.info("preappend " + prefix + "_" + input.head) - } -} - -case class WordAppendExecutor(suffix : String) extends StormStreamExecutor1[String] { - val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass) - var config : Config = null - override def prepareConfig(config : Config){this.config = config} - override def init {} - override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[String]]): Unit ={ - outputCollector.collect(Tuple1(input.head + "_" + suffix)) - LOG.info("append " + input.head + "_" + suffix) - } -} - -case class WordAppendForAlertExecutor(suffix : String) extends StormStreamExecutor2[String, util.SortedMap[Object, Object]] { - val LOG = LoggerFactory.getLogger(WordPrependExecutor.getClass) - var config : Config = null - override def prepareConfig(config : Config){this.config = config} - override def init {} - override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple2[String, util.SortedMap[Object, Object]]]): Unit ={ - val value = new util.TreeMap[Object, Object]() - value.put("word", input.head + "_" + suffix) - outputCollector.collect(Tuple2("key1", value)) - LOG.info("append " + input.head + "_" + suffix) - } -} - -case class PatternAlertExecutor(pattern : String) extends StormStreamExecutor1[String] { - val LOG = LoggerFactory.getLogger(PatternAlertExecutor.getClass) - var config : Config = null - override def prepareConfig(config : Config){this.config = config} - override def init {} - override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[String]]): Unit ={ - LOG.info("send out " + input.head) - if(input.head.asInstanceOf[String].matches(pattern)){ - LOG.info("Alert hadppens for input " + input.head + " and for pattern " + pattern) - } - } -} - -case class GroupedEchoExecutor() extends StormStreamExecutor1[String] { - val LOG = LoggerFactory.getLogger(GroupedEchoExecutor.getClass) - var config : Config = null - override def prepareConfig(config : Config){this.config = config} - override def init {} - override def flatMap(input : Seq[AnyRef], outputCollector : Collector[Tuple1[String]]): Unit ={ - LOG.info("get " + input(0)) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala deleted file mode 100644 index 424bdef..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream - -import com.typesafe.config.{Config, ConfigFactory} - -/** - * explicit union - * a.union(b,c).alert() means (a,b,c)'s output is united into alert() - * before running this testing, we should define in eagle service one policy and one stream schema - * 1. stream schema - * curl -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s1","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]' - * 2. policy - * curl -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 [(str:regexp(word,'\'.*test.*\)==true)] select * insert into outputStream ;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]' - */ -object UnionForAlert extends App{ - val config : Config = ConfigFactory.load; - val env = ExecutionEnvironments.getStorm(config) - val tail1 = env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a)) - val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key2",a)) - tail1.streamUnion(List(tail2)).alert(Seq("s1","s2"), "alert1", consume = false) - env.execute() -} - -/** - * test alert after flatMap - */ -object TestAlertAfterFlatMap extends App{ - val config : Config = ConfigFactory.load; - val env = ExecutionEnvironments.getStorm(config) - val tail1 = env.fromSpout(TestSpout()) - .flatMap(WordPrependForAlertExecutor("test")) - .alert(Seq("s1"), "alert1", consume = false) - //env.execute -} - -/** - * test alert after Map - */ -object TestAlertAfterMap extends App{ - val config : Config = ConfigFactory.load; - val env = ExecutionEnvironments.getStorm(config) - val tail1 = env.fromSpout(TestSpout()) - .flatMap(WordPrependForAlertExecutor2("test")) - .map2(a => ("key", a)) - .alert(Seq("s1"), "alert1", false) - //env.execute -} - -object StormRunnerWithoutSplitOrJoin extends App{ - val config : Config = ConfigFactory.load; - val env = ExecutionEnvironments.getStorm(config) - env.fromSpout(TestSpout()).flatMap(EchoExecutor()).flatMap(WordPrependExecutor("test")) - .flatMap(PatternAlertExecutor("test.*")) -// env.execute() -} - -object StormRunnerWithSplit extends App{ - val config : Config = ConfigFactory.load; - val env = ExecutionEnvironments.getStorm(config) - val toBeSplit = env.fromSpout(TestSpout()).flatMap(EchoExecutor()) - toBeSplit.flatMap(WordPrependExecutor("test")).flatMap(PatternAlertExecutor("test.*")) - toBeSplit.flatMap(WordAppendExecutor("test")) -// env.execute() -} - -object StormRunnerWithUnion extends App{ - val config : Config = ConfigFactory.load; - val env = ExecutionEnvironments.getStorm(config) - val tail1 = env.fromSpout(TestSpout()).flatMap(WordPrependExecutor("test")) - val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendExecutor("test")) - tail1.streamUnion(List(tail2)).flatMap(PatternAlertExecutor(".*test.*")) - env.execute() -} - -object StormRunnerWithFilter extends App{ - val config : Config = ConfigFactory.load; - val env = ExecutionEnvironments.getStorm(config) - env.fromSpout(TestSpout()).flatMap(EchoExecutor()).flatMap(WordPrependExecutor("test")). - filter(_=>false). - flatMap(PatternAlertExecutor("test.*")) - //env.execute -} - -object StormRunnerWithJavaExecutor extends App{ - val config : Config = ConfigFactory.load; - val env = ExecutionEnvironments.getStorm(config) - env.fromSpout(TestSpout()).flatMap(new JavaEchoExecutor()).flatMap(WordPrependExecutor("test")). - filter(_=>false). - flatMap(PatternAlertExecutor("test.*")) - //env.execute -} - -object StormRunnerWithKeyValueSpout extends App{ - val config : Config = ConfigFactory.load; - val env = ExecutionEnvironments.getStorm(config) - env.fromSpout(TestKeyValueSpout()).groupBy(1).flatMap(new GroupedEchoExecutor()).parallelism(2) - //env.execute -} - -object StormRunnerWithKeyValueSpoutRenameOutputFields extends App{ - val config : Config = ConfigFactory.load; - val env = ExecutionEnvironments.getStorm(config) - env.fromSpout(TestKeyValueSpout()).withOutputFields(2).groupBy(0).flatMap(new GroupedEchoExecutor()).parallelism(2) - //env.execute -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStreamDAGBuilder.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStreamDAGBuilder.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStreamDAGBuilder.scala deleted file mode 100644 index 48dc7e5..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStreamDAGBuilder.scala +++ /dev/null @@ -1,236 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream - -import com.typesafe.config.ConfigFactory -import org.scalatest.{FlatSpec, Matchers} - -class TestStreamDAGBuilder extends FlatSpec with Matchers{ -// "a single source DAG with groupBy" should "be traversed without groupBy node" in { -// val config = ConfigFactory.load() -// val env = ExecutionEnvironmentFactory.getStorm(config) -// val tail = env.newSource(null).flatMap(EchoExecutor()).groupBy(0).flatMap(WordPrependExecutor("test")) -// val dag = new StreamDAGBuilder(env.heads).build() -// val iter = dag.iterator() -// assert(iter.hasNext) -// iter.next() match{ -// case StormSourceProducer(t) => assert(t == null) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match{ -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[EchoExecutor]) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match{ -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor]) -// case _ => assert(false) -// } -// } -// -// "a single source DAG with groupBy from spout" should "be traversed without groupBy node" in { -// val config = ConfigFactory.load() -// val env = ExecutionEnvironmentFactory.getStorm(config) -// val tail = env.newSource(null).groupBy(0).flatMap(WordPrependExecutor("test")) -// val dag = new StreamDAGBuilder(env.heads).build() -// val iter = dag.iterator() -// assert(iter.hasNext) -// iter.next() match{ -// case StormSourceProducer(t) => assert(t == null) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match{ -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor]) -// case _ => assert(false) -// } -// } -// -// "a single source DAG with groupBy from spout and then split" should "be traversed without groupBy node" in { -// val config = ConfigFactory.load() -// val env = ExecutionEnvironmentFactory.getStorm(config) -// val groupby = env.newSource(null).groupBy(0) -// groupby.flatMap(WordPrependExecutor("test")) -// groupby.flatMap(WordAppendExecutor("test")) -// val dag = new StreamDAGBuilder(env.heads).build() -// val iter = dag.iterator() -// assert(iter.hasNext) -// iter.next() match{ -// case StormSourceProducer(t) => assert(t == null) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match{ -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor]) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match{ -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordAppendExecutor]) -// case _ => assert(false) -// } -// } -// -// "a single source DAG without stream join" should "be traversed sequentially like specified" in{ -// val config = ConfigFactory.load() -// val env = ExecutionEnvironmentFactory.getStorm(config) -// val tail = env.newSource(null).flatMap(EchoExecutor()).flatMap(WordPrependExecutor("test")) -// val dag = new StreamDAGBuilder(env.heads).build() -// val iter = dag.iterator() -// assert(iter.hasNext) -// iter.next() match{ -// case StormSourceProducer(t) => assert(t == null) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match{ -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[EchoExecutor]) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match{ -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor]) -// case _ => assert(false) -// } -// } -// -// "a single source with split" should "has more than one tail producer" in { -// val config = ConfigFactory.load() -// val env = ExecutionEnvironmentFactory.getStorm(config) -// val echo = env.newSource(null).flatMap(EchoExecutor()) -// val tail1 = echo.flatMap(WordPrependExecutor("test")) -// val tail2 = echo.flatMap(WordAppendExecutor("test")) -// val dag = new StreamDAGBuilder(env.heads).build() -// val iter = dag.iterator() -// assert(iter.hasNext) -// iter.next() match { -// case StormSourceProducer(t) => assert(t == null) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match { -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[EchoExecutor]) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match { -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor]) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match { -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordAppendExecutor]) -// case _ => assert(false) -// } -// } -// -// "a single source with split and join" should "has join" in { -// val config = ConfigFactory.load() -// val env = ExecutionEnvironmentFactory.getStorm(config) -// val echo = env.newSource(null).flatMap(EchoExecutor()) -// val tail1 = echo.flatMap(WordPrependExecutor("test")) -// val tail2 = echo.flatMap(WordAppendExecutor("test")).filter(_=>true).streamUnion(List(tail1)). -// flatMap(PatternAlertExecutor("test*")) -// val dag = new StreamDAGBuilder(env.heads).build() -// val iter = dag.iterator() -// assert(iter.hasNext) -// iter.next() match { -// case StormSourceProducer(t) => assert(t == null) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match { -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[EchoExecutor]) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match { -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor]) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match { -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordAppendExecutor]) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match { -// case FilterProducer(fn) => -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match { -// case FlatMapProducer(worker) => assert(worker.asInstanceOf[PatternAlertExecutor].pattern.equals("test*")) -// case _ => assert(false) -// } -// assert(!iter.hasNext) -// } -// -// "multiple sources with split and union" should "has union" in { -// val config = ConfigFactory.load() -// val env = ExecutionEnvironmentFactory.getStorm(config) -// val source1 = env.newSource(TestSpout()) -// val source2 = env.newSource(TestSpout()) -// val source3 = env.newSource(TestSpout()) -// -// val tail1 = source1.flatMap(WordPrependExecutor("test")) -// val tail2 = source2.filter(_=>true) -// val tail3 = source3.flatMap(WordAppendExecutor("test")).streamUnion(List(tail1, tail2)). -// flatMap(PatternAlertExecutor("abc*")) -// -// val dag = new StreamDAGBuilder(env.heads).build() -// val iter = dag.iterator() -// -// assert(iter.hasNext) -// iter.next() match { -// case StormSourceProducer(t) => assert(t.isInstanceOf[TestSpout]) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match { -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor]) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match { -// case StormSourceProducer(t) => assert(t.isInstanceOf[String]) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match { -// case FilterProducer(fn) => -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match { -// case SourceProducer(t) => assert(t.isInstanceOf[String]) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match { -// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordAppendExecutor]) -// case _ => assert(false) -// } -// assert(iter.hasNext) -// iter.next() match { -// case FlatMapProducer(worker) => assert(worker.asInstanceOf[PatternAlertExecutor].pattern.equals("abc*")) -// case _ => assert(false) -// } -// assert(!iter.hasNext) -// } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala deleted file mode 100644 index 7754765..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.datastream - -import org.apache.eagle.datastream.core.StreamContext -import org.apache.eagle.datastream.storm.StormExecutionEnvironment - -/** - * @since 12/4/15 - */ -case class Entity(name:String,value:Double,var inc:Int=0) - -object TestIterableWithGroupBy extends App { - - val env = ExecutionEnvironments.get[StormExecutionEnvironment](args) - - val tuples = Seq( - Entity("a", 1), - Entity("a", 2), - Entity("a", 3), - Entity("b", 2), - Entity("c", 3), - Entity("d", 3) - ) - - env.from(tuples) - .groupByKey(_.name) - .map(o => {o.inc += 2;o}) - .filter(_.name != "b") - .filter(_.name != "c") - .groupByKey(o=>(o.name,o.value)) - .map(o => (o.name,o)) - .map(o => (o._1,o._2.value,o._2.inc)) - .foreach(println) - - env.execute() -} - -object TestIterableWithGroupByWithStreamContext extends App { - val stream = StreamContext(args) - - val tuples = Seq( - Entity("a", 1), - Entity("a", 2), - Entity("a", 3), - Entity("b", 2), - Entity("c", 3), - Entity("d", 3) - ) - - stream.from(tuples) - .groupByKey(_.name) - .map(o => {o.inc += 2;o}) - .filter(_.name != "b") - .filter(_.name != "c") - .groupByKey(o=>(o.name,o.value)) - .map(o => (o.name,o)) - .map(o => (o._1,o._2.value,o._2.inc)) - .foreach(println) - - stream.submit[StormExecutionEnvironment] -} - -object TestIterableWithGroupByCircularly extends App{ - val env = ExecutionEnvironments.get[StormExecutionEnvironment](args) - - val tuples = Seq( - Entity("a", 1), - Entity("a", 2), - Entity("a", 3), - Entity("b", 2), - Entity("c", 3), - Entity("d", 3) - ) - - env.from(tuples,recycle = true) - .map(o => {o.inc += 2;o}) - .groupByKey(_.name) - .foreach(println) - env.execute() -} - -object TestGroupByKeyOnSpoutproxy extends App{ - val env = ExecutionEnvironments.get[StormExecutionEnvironment](args) - - val tuples = Seq( - Entity("a", 1), - Entity("a", 2), - Entity("a", 3), - Entity("b", 2), - Entity("c", 3), - Entity("d", 3) - ) - - env.fromSpout[String](TestSpout()) - .groupByKey(_.charAt(0)) - .foreach(println) - env.execute() -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/StreamingProcessConstants.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/StreamingProcessConstants.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/StreamingProcessConstants.java deleted file mode 100644 index fc2a016..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/StreamingProcessConstants.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.dataproc.core; - -public class StreamingProcessConstants { - public static final String EVENT_PARTITION_KEY = "eventPartitionKey"; - public static final String EVENT_STREAM_NAME = "streamName"; - public static final String EVENT_ATTRIBUTE_MAP = "value"; -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaMapper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaMapper.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaMapper.java deleted file mode 100644 index 7e66478..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaMapper.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.datastream; - -import java.util.List; - -public interface JavaMapper { - List<Object> map(List<Object> input); -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaTypeCompatible.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaTypeCompatible.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaTypeCompatible.java deleted file mode 100644 index 39ce6c2..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaTypeCompatible.java +++ /dev/null @@ -1,24 +0,0 @@ -package org.apache.eagle.datastream; -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * @since 12/8/15 - */ -public interface JavaTypeCompatible { - Class<?> getType(); -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala deleted file mode 100644 index ddea46f..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream - -trait FlatMapper[T] extends Serializable { - def flatMap(input : Seq[AnyRef], collector : Collector[T]) -} - -case class FlatMapperWrapper[T](func:(Any,Collector[T]) => Unit) extends FlatMapper[T]{ - override def flatMap(input: Seq[AnyRef], collector: Collector[T]): Unit = func(input,collector) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala deleted file mode 100644 index 836c7eb..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream - -import com.typesafe.config.Config -import scala.collection.JavaConverters._ - -trait StormStreamExecutor[R <: Any] extends FlatMapper[R] { - def prepareConfig(config : Config) - def init - def fields : Array[String] -} - -trait JavaStormStreamExecutor[R <: AnyRef] extends FlatMapper[R] { - def prepareConfig(config : Config) - def init - def fields : Array[String] - override def toString() = this.getClass.getSimpleName - - override def flatMap(input : Seq[AnyRef], collector : Collector[R]) = flatMap(input.asJava,collector) - - def flatMap(input : java.util.List[AnyRef], collector : Collector[R]) -} - -abstract class StormStreamExecutor1[T0] extends StormStreamExecutor[Tuple1[T0]] { - override def fields = Array("f0") -} - -abstract class JavaStormStreamExecutor1[T0] extends JavaStormStreamExecutor[Tuple1[T0]] { - override def fields = Array("f0") -} - -abstract class StormStreamExecutor2[T0, T1] extends StormStreamExecutor[Tuple2[T0, T1]] { - override def fields = Array("f0", "f1") -} - -abstract class JavaStormStreamExecutor2[T0, T1] extends JavaStormStreamExecutor[Tuple2[T0, T1]] { - override def fields = Array("f0", "f1") -} - -abstract class StormStreamExecutor3[T0, T1, T2] extends StormStreamExecutor[Tuple3[T0, T1, T2]] { - override def fields = Array("f0", "f1", "f2") -} - -abstract class JavaStormStreamExecutor3[T0, T1, T2] extends JavaStormStreamExecutor[Tuple3[T0, T1, T2]] { - override def fields = Array("f0", "f1", "f2") -} - -abstract class StormStreamExecutor4[T0, T1, T2, T3] extends StormStreamExecutor[Tuple4[T0, T1, T2, T3]] { - override def fields = Array("f0", "f1", "f2", "f3") -} - -abstract class JavaStormStreamExecutor4[T0, T1, T2, T3] extends JavaStormStreamExecutor[Tuple4[T0, T1, T2, T3]] { - override def fields = Array("f0", "f1", "f2", "f3") -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/pom.xml b/eagle-core/eagle-data-process/pom.xml index e32be21..60bfced 100644 --- a/eagle-core/eagle-data-process/pom.xml +++ b/eagle-core/eagle-data-process/pom.xml @@ -31,6 +31,5 @@ <module>eagle-storm-jobrunning-spout</module> <module>eagle-job-common</module> <module>eagle-stream-process-api</module> - <module>eagle-stream-pipeline</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertAPIEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertAPIEntity.java deleted file mode 100644 index 1ff8b27..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertAPIEntity.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.entity; - - -import org.apache.eagle.log.entity.meta.*; -import org.apache.eagle.policy.common.Constants; -import org.apache.eagle.common.metric.AlertContext; -import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; -import org.codehaus.jackson.annotate.JsonIgnore; -import org.codehaus.jackson.annotate.JsonIgnoreProperties; -import org.codehaus.jackson.annotate.JsonProperty; -import org.codehaus.jackson.map.annotate.JsonSerialize; - -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) -@Table("alertdetail") -@ColumnFamily("f") -@Prefix("hadoop") -@Service(Constants.ALERT_SERVICE_ENDPOINT_NAME) -@TimeSeries(true) -@JsonIgnoreProperties(ignoreUnknown = true) -@Tags({ - "site", - "hostname", - "application", - "policyId", - "sourceStreams", - "alertSource", - "alertExecutorId" -}) -public class AlertAPIEntity extends TaggedLogAPIEntity{ - @Column("description") - private String description; - @Column("remediationID") - private String remediationID; - @Column("remediationCallback") - private String remediationCallback; - @Column("alertContext") - private String alertContext; - @Column("streamId") - private String streamId; - - public String getDescription() { - return description; - } - - public void setDescription(String description) { - this.description = description; - _pcs.firePropertyChange("description", null, null); - } - - public String getRemediationID() { - return remediationID; - } - - public void setRemediationID(String remediationID) { - this.remediationID = remediationID; - _pcs.firePropertyChange("remediationID", null, null); - } - - public String getRemediationCallback() { - return remediationCallback; - } - - public void setRemediationCallback(String remediationCallback) { - this.remediationCallback = remediationCallback; - _pcs.firePropertyChange("remediationCallback", null, null); - } - - @JsonIgnore - public String getAlertContext() { - return alertContext; - } - - @JsonProperty("alertContext") - public AlertContext getWrappedAlertContext() { - return AlertContext.fromJsonString(alertContext); - } - - @JsonIgnore - public void setAlertContext(String alertContext) { - this.alertContext = alertContext; - _pcs.firePropertyChange("alertContext", null, null); - } - @JsonProperty("alertContext") - public void setDecodedAlertContext(AlertContext alertContext) { - if(alertContext != null) this.alertContext = alertContext.toJsonString(); - _pcs.firePropertyChange("alertContext", null, null); - } - - public String getStreamId() { - return streamId; - } - - public void setStreamId(String streamId) { - this.streamId = streamId; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertContextSerDeser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertContextSerDeser.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertContextSerDeser.java deleted file mode 100644 index f001725..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertContextSerDeser.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.entity; - -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.eagle.common.metric.AlertContext; -import org.apache.hadoop.hbase.util.Bytes; - -import org.apache.eagle.log.entity.meta.EntitySerDeser; - -public class AlertContextSerDeser implements EntitySerDeser<AlertContext> { - - @Override - public AlertContext deserialize(byte[] bytes) { - AlertContext context = new AlertContext(); - Map<String, String> properties = new HashMap<String, String>(); - final int length = bytes.length; - if (length < 4) { return context; } - int size = Bytes.toInt(bytes, 0, 4); - - int offset = 4; - for (int i = 0; i < size; i++) { - int keySize = Bytes.toInt(bytes, offset, 4); - offset += 4; - int valueSize = Bytes.toInt(bytes, offset, 4); - offset += 4; - String key = Bytes.toString(bytes, offset, keySize); - offset += keySize; - String value =Bytes.toString(bytes, offset, valueSize); - offset += valueSize; - properties.put(key, value); - } - context.addAll(properties); - return context; - } - - @Override - public byte[] serialize(AlertContext context) { - - final Map<String, String> pair = context.getProperties(); - int totalSize = 4; - for (Entry<String, String> entry : pair.entrySet()) { - String key = entry.getKey(); - String value = entry.getValue(); - int keySize = 0; - if(key!=null) keySize = key.getBytes().length; - int valueSize = 0; - if(value!=null) valueSize = value.getBytes().length; - totalSize += keySize + valueSize + 8; - } - byte[] buffer = new byte[totalSize]; - - Bytes.putInt(buffer, 0, pair.size()); - int offset = 4; - for (Entry<String, String> entry : pair.entrySet()) { - String key = entry.getKey(); - String value = entry.getValue(); - - int keySize = key !=null ? key.getBytes().length : 0; - int valueSize = value != null ? value.getBytes().length:0; - - Bytes.putInt(buffer, offset, keySize); - offset += 4; - Bytes.putInt(buffer, offset, valueSize); - offset += 4; - - - Bytes.putBytes(buffer, offset, key != null ? key.getBytes() : new byte[0], 0, keySize); - offset += keySize; - Bytes.putBytes(buffer, offset, value != null ? value.getBytes() : new byte[0], 0, valueSize); - offset += valueSize; - } - return buffer; - } - - - @Override - public Class<AlertContext> type(){ - return AlertContext.class; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDataSourceEntity.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDataSourceEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDataSourceEntity.java deleted file mode 100644 index 7f8dbe3..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDataSourceEntity.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.entity; - -import org.apache.eagle.policy.common.Constants; -import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; -import org.apache.eagle.log.entity.meta.*; -import org.codehaus.jackson.annotate.JsonIgnoreProperties; -import org.codehaus.jackson.map.annotate.JsonSerialize; - -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) -@Table("alertDataSource") -@ColumnFamily("f") -@Prefix("alertDataSource") -@Service(Constants.ALERT_DATA_SOURCE_SERVICE_ENDPOINT_NAME) -@JsonIgnoreProperties(ignoreUnknown = true) -@TimeSeries(false) -@Tags({"site", "dataSource"}) -@Deprecated -public class AlertDataSourceEntity extends TaggedLogAPIEntity{ - @Column("a") - private boolean enabled; - @Column("b") - private String config; - @Column("c") - private String description; - - public String getConfig() { - return config; - } - - public void setConfig(String config) { - this.config = config; - valueChanged("config"); - } - - public boolean isEnabled() { - return enabled; - } - - public void setEnabled(boolean enabled) { - this.enabled = enabled; - valueChanged("enabled"); - } - - public String getDescription() { - return description; - } - - public void setDescription(String desc) { - this.description = desc; - valueChanged("desc"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDefinitionAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDefinitionAPIEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDefinitionAPIEntity.java deleted file mode 100644 index 036ec7a..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDefinitionAPIEntity.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.entity; - -import org.apache.commons.lang.builder.HashCodeBuilder; -import org.apache.eagle.policy.common.Constants; -import org.apache.eagle.log.entity.meta.Column; -import org.apache.eagle.log.entity.meta.ColumnFamily; -import org.apache.eagle.log.entity.meta.Index; -import org.apache.eagle.log.entity.meta.Indexes; -import org.apache.eagle.log.entity.meta.Prefix; -import org.apache.eagle.log.entity.meta.Service; -import org.apache.eagle.log.entity.meta.Table; -import org.apache.eagle.log.entity.meta.Tags; -import org.apache.eagle.log.entity.meta.TimeSeries; -import org.codehaus.jackson.annotate.JsonIgnoreProperties; -import org.codehaus.jackson.map.annotate.JsonSerialize; - -/** - * site: site name - * dataSource: data source name - * - * alertExecutorId: Group Policy by alertExecutorId, the policy definition with the sample ["site", "dataSource", "alertExecutorId"] should run on the sample alert executor - * - * policyId: policy name, should be unique - * policyType: policy engine implementation type - */ -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) -@Table("alertdef") -@ColumnFamily("f") -@Prefix("alertdef") -@Service(Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME) -@JsonIgnoreProperties(ignoreUnknown = true) -@TimeSeries(false) -@Tags({"site", "application", "alertExecutorId", "policyId", "policyType"}) -@Indexes({ - @Index(name="Index_1_alertExecutorId", columns = { "alertExecutorID" }, unique = true), -}) -public class AlertDefinitionAPIEntity extends AbstractPolicyDefinitionEntity { - @Column("a") - private String description; - @Column("b") - private String policyDef; - @Column("c") - private String dedupeDef; - @Column("d") - private String notificationDef; - @Column("e") - private String remediationDef; - @Column("f") - private boolean enabled; - @Column("g") - private String owner; - @Column("h") - private long lastModifiedDate; - @Column("i") - private long severity; - @Column("j") - private long createdTime; - @Column("k") - private boolean markdownEnabled; - @Column("l") - private String markdownReason; - - public String getDescription() { - return description; - } - public void setDescription(String desc) { - this.description = desc; - valueChanged("description"); - } - - public String getPolicyDef() { - return policyDef; - } - public void setPolicyDef(String policyDef) { - this.policyDef = policyDef; - valueChanged("policyDef"); - } - public String getDedupeDef() { - return dedupeDef; - } - public void setDedupeDef(String dedupeDef) { - this.dedupeDef = dedupeDef; - valueChanged("dedupeDef"); - } - public String getNotificationDef() { - return notificationDef; - } - public void setNotificationDef(String notificationDef) { - this.notificationDef = notificationDef; - valueChanged("notificationDef"); - } - public String getRemediationDef() { - return remediationDef; - } - public void setRemediationDef(String remediationDef) { - this.remediationDef = remediationDef; - valueChanged("remediationDef"); - } - public boolean isEnabled() { - return enabled; - } - public void setEnabled(boolean enabled) { - this.enabled = enabled; - valueChanged("enabled"); - } - public String getOwner() { - return owner; - } - public void setOwner(String owner) { - this.owner = owner; - valueChanged("owner"); - } - public long getLastModifiedDate() { - return lastModifiedDate; - } - public void setLastModifiedDate(long lastModifiedDate) { - this.lastModifiedDate = lastModifiedDate; - valueChanged("lastModifiedDate"); - } - public long getSeverity() { - return severity; - } - public void setSeverity(long severity) { - this.severity = severity; - valueChanged("severity"); - } - public long getCreatedTime() { - return createdTime; - } - public void setCreatedTime(long createdTime) { - this.createdTime = createdTime; - valueChanged("createdTime"); - } - public boolean isMarkdownEnabled() { return markdownEnabled; } - public void setMarkdownEnabled(boolean markdownEnabled) { - this.markdownEnabled = markdownEnabled; - valueChanged("markdownEnabled"); - } - public String getMarkdownReason() { return markdownReason; } - public void setMarkdownReason(String markdownReason) { - this.markdownReason = markdownReason; - valueChanged("markdownReason"); - } - - public boolean equals(Object o){ - if(o == this) - return true; - if(!(o instanceof AlertDefinitionAPIEntity)) - return false; - AlertDefinitionAPIEntity that = (AlertDefinitionAPIEntity)o; - if(that.enabled == this.enabled && - compare(that.policyDef, this.policyDef) && - compare(that.dedupeDef, this.dedupeDef) && - compare(that.notificationDef, this.notificationDef) && - compare(that.remediationDef, this.remediationDef)) - return true; - return false; - } - - private boolean compare(String a, String b){ - if(a == b) - return true; - if(a == null || b == null) - return false; - if(a.equals(b)) - return true; - return false; - } - - public int hashCode(){ - HashCodeBuilder builder = new HashCodeBuilder(); - builder.append(enabled); - builder.append(policyDef); - builder.append(dedupeDef); - builder.append(notificationDef); - builder.append(remediationDef); - return builder.toHashCode(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertEntityRepository.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertEntityRepository.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertEntityRepository.java deleted file mode 100644 index 507c482..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertEntityRepository.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.entity; - -import org.apache.eagle.common.metric.AlertContext; -import org.apache.eagle.log.entity.repo.EntityRepository; - -public class AlertEntityRepository extends EntityRepository { - public AlertEntityRepository() { - serDeserMap.put(AlertContext.class, new AlertContextSerDeser()); - entitySet.add(AlertAPIEntity.class); - entitySet.add(AlertDefinitionAPIEntity.class); - entitySet.add(AlertStreamSchemaEntity.class); - entitySet.add(AlertStreamEntity.class); - //entitySet.add(AlertDataSourceEntity.class); - entitySet.add(AlertExecutorEntity.class); - entitySet.add(ApplicationDescServiceEntity.class); - entitySet.add(FeatureDescServiceEntity.class); - entitySet.add(SiteDescServiceEntity.class); - entitySet.add(SiteApplicationServiceEntity.class); - this.registerEntity(GenericResourceEntity.class); - entitySet.add(AlertNotificationEntity.class); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertExecutorEntity.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertExecutorEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertExecutorEntity.java deleted file mode 100644 index 7e023e6..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertExecutorEntity.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.entity; - -import org.apache.eagle.policy.common.Constants; -import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; -import org.apache.eagle.log.entity.meta.*; -import org.codehaus.jackson.annotate.JsonIgnoreProperties; -import org.codehaus.jackson.map.annotate.JsonSerialize; - -/** - * Stream and Alert executor Id mapping should be automatically created by topology definition - */ -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) -@Table("alertExecutor") -@ColumnFamily("f") -@Prefix("alertExecutor") -@Service(Constants.ALERT_EXECUTOR_SERVICE_ENDPOINT_NAME) -@JsonIgnoreProperties(ignoreUnknown = true) -@TimeSeries(false) -@Tags({"site","application", "alertExecutorId", "streamName"}) -public class AlertExecutorEntity extends TaggedLogAPIEntity{ - @Column("a") - private String description; - - public String getDescription() { - return description; - } - - public void setDescription(String desc) { - this.description = desc; - valueChanged("description"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertNotificationEntity.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertNotificationEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertNotificationEntity.java deleted file mode 100644 index 2782be2..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertNotificationEntity.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.alert.entity; - -import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; -import org.apache.eagle.log.entity.meta.*; -import org.apache.eagle.policy.common.Constants; -import org.codehaus.jackson.annotate.JsonIgnoreProperties; -import org.codehaus.jackson.map.annotate.JsonSerialize; - -import java.util.List; -import java.util.Map; - - -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) -@Table("alertNotifications") -@ColumnFamily("f") -@Prefix("alertNotifications") -@Service(Constants.ALERT_NOTIFICATION_SERVICE_ENDPOINT_NAME) -@JsonIgnoreProperties(ignoreUnknown = true) -@TimeSeries(false) -@Tags({"notificationType"}) -public class AlertNotificationEntity extends TaggedLogAPIEntity { - @Column("a") - private String className; - public String getClassName(){ - return className; - } - public void setClassName(String className){ - this.className = className; - valueChanged("className"); - } - - @Column("b") - private String description; - public String getDescription(){ - return description; - } - public void setDescription(String description){ - this.description = description; - valueChanged("description"); - } - - @Column("c") - private boolean enabled; - public boolean isEnabled() { - return enabled; - } - - public void setEnabled(boolean enabled) { - this.enabled = enabled; - valueChanged("enabled"); - } - - @Column("d") - private List<Map<String, String>> fields; - public List<Map<String, String>> getFields() { - return fields; - } - - public void setFields(List<Map<String, String>> fields) { - this.fields = fields; - valueChanged("fields"); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java deleted file mode 100644 index 685da4f..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.entity; - -import org.apache.eagle.policy.common.Constants; -import org.codehaus.jackson.annotate.JsonIgnoreProperties; -import org.codehaus.jackson.map.annotate.JsonSerialize; - -import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; -import org.apache.eagle.log.entity.meta.Column; -import org.apache.eagle.log.entity.meta.ColumnFamily; -import org.apache.eagle.log.entity.meta.Prefix; -import org.apache.eagle.log.entity.meta.Service; -import org.apache.eagle.log.entity.meta.Table; -import org.apache.eagle.log.entity.meta.Tags; -import org.apache.eagle.log.entity.meta.TimeSeries; - -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) -@Table("alertStream") -@ColumnFamily("f") -@Prefix("alertStream") -@Service(Constants.ALERT_STREAM_SERVICE_ENDPOINT_NAME) -@JsonIgnoreProperties(ignoreUnknown = true) -@TimeSeries(false) -@Tags({"site","application", "streamName"}) -public class AlertStreamEntity extends TaggedLogAPIEntity{ - @Column("a") - private String description; - - public String getDescription() { - return description; - } - public void setDescription(String description) { - this.description = description; - valueChanged("description"); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/ApplicationDescServiceEntity.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/ApplicationDescServiceEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/ApplicationDescServiceEntity.java deleted file mode 100644 index 54ff203..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/ApplicationDescServiceEntity.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.alert.entity; - - -import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; -import org.apache.eagle.log.entity.meta.*; -import org.apache.eagle.policy.common.Constants; -import org.codehaus.jackson.annotate.JsonIgnoreProperties; -import org.codehaus.jackson.map.annotate.JsonSerialize; - -import java.util.List; - -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) -@Table("eagleApplicationDesc") -@ColumnFamily("f") -@Prefix("eagleApplicationDesc") -@Service(Constants.APPLICATION_DESCRIPTION_SERVICE_ENDPOINT_NAME) -@JsonIgnoreProperties(ignoreUnknown = true) -@TimeSeries(false) -@Tags({"application"}) -public class ApplicationDescServiceEntity extends TaggedLogAPIEntity { - @Column("a") - private String description; - @Column("b") - private String alias; - @Column("c") - private String groupName; - @Column("d") - private List<String> features; - @Column("e") - private String config; - - public String getConfig() { - return config; - } - - public void setConfig(String config) { - this.config = config; - valueChanged("config"); - } - - public String getDescription() { - return description; - } - - public void setDescription(String description) { - this.description = description; - valueChanged("description"); - } - - public String getGroupName() { - return groupName; - } - - public void setGroupName(String groupName) { - this.groupName = groupName; - valueChanged("groupName"); - } - - public String getAlias() { - return alias; - } - - public void setAlias(String alias) { - this.alias = alias; - valueChanged("alias"); - } - - public List<String> getFeatures() { - return features; - } - - public void setFeatures(List<String> features) { - this.features = features; - valueChanged("features"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/FeatureDescServiceEntity.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/FeatureDescServiceEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/FeatureDescServiceEntity.java deleted file mode 100644 index 0f3c738..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/FeatureDescServiceEntity.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.alert.entity; - - -import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; -import org.apache.eagle.log.entity.meta.*; -import org.apache.eagle.policy.common.Constants; -import org.codehaus.jackson.annotate.JsonIgnoreProperties; -import org.codehaus.jackson.map.annotate.JsonSerialize; - -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) -@Table("eagleFeatureDesc") -@ColumnFamily("f") -@Prefix("eagleFeatureDesc") -@Service(Constants.FEATURE_DESCRIPTION_SERVICE_ENDPOINT_NAME) -@JsonIgnoreProperties(ignoreUnknown = true) -@TimeSeries(false) -@Tags({"feature"}) -public class FeatureDescServiceEntity extends TaggedLogAPIEntity { - @Column("a") - private String description; - @Column("b") - private String version; - - public String getVersion() { - return version; - } - - public void setVersion(String version) { - this.version = version; - valueChanged("version"); - } - - public String getDescription() { - return description; - } - - public void setDescription(String description) { - this.description = description; - valueChanged("description"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/GenericResourceEntity.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/GenericResourceEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/GenericResourceEntity.java deleted file mode 100644 index c855499..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/GenericResourceEntity.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.alert.entity; - -import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; -import org.apache.eagle.log.entity.meta.*; -import org.apache.eagle.policy.common.Constants; -import org.codehaus.jackson.annotate.JsonIgnoreProperties; -import org.codehaus.jackson.map.annotate.JsonSerialize; - -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) -@Table("eagle_metadata") -@ColumnFamily("f") -@Prefix("generic_resource") -@Service(Constants.GENERIC_RESOURCE_SERVICE_ENDPOINT_NAME) -@JsonIgnoreProperties(ignoreUnknown = true) -@TimeSeries(false) -@Tags({"site", "name"}) -public class GenericResourceEntity extends TaggedLogAPIEntity { - @Column("a") - private String value; - - public String getValue() { - return value; - } - - public void setValue(String value) { - this.value = value; - valueChanged("value"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/SiteApplicationServiceEntity.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/SiteApplicationServiceEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/SiteApplicationServiceEntity.java deleted file mode 100644 index 5eea4b3..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/SiteApplicationServiceEntity.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.alert.entity; - - -import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; -import org.apache.eagle.log.entity.meta.*; -import org.apache.eagle.policy.common.Constants; -import org.codehaus.jackson.annotate.JsonIgnoreProperties; -import org.codehaus.jackson.map.annotate.JsonSerialize; - - -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) -@Table("eagleSiteApplication") -@ColumnFamily("f") -@Prefix("eagleSiteApplication") -@Service(Constants.SITE_APPLICATION_SERVICE_ENDPOINT_NAME) -@JsonIgnoreProperties(ignoreUnknown = true) -@TimeSeries(false) -@Tags({"site", "application"}) -public class SiteApplicationServiceEntity extends TaggedLogAPIEntity { - @Column("a") - private String config; - @Column("b") - private Boolean enabled; - - public String getConfig() { - return config; - } - - public void setConfig(String config) { - this.config = config; - valueChanged("config"); - } - - public Boolean getEnabled() { - return enabled; - } - - public void setEnabled(Boolean enabled) { - this.enabled = enabled; - valueChanged("enabled"); - } -}
