Repository: incubator-eagle Updated Branches: refs/heads/master 7499be694 -> 8d7f81e1c
[EAGLE-776] add unit test for eagle-alert-parent add unit test for eagle-alert-parent https://issues.apache.org/jira/browse/EAGLE-776 Author: koone <luokun1...@126.com> Closes #673 from koone/EAGLE-777. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/8d7f81e1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/8d7f81e1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/8d7f81e1 Branch: refs/heads/master Commit: 8d7f81e1cdcc61e16769677dda2c371897f25dca Parents: 7499be6 Author: koone <luokun1...@126.com> Authored: Wed Nov 23 14:29:48 2016 +0800 Committer: wujinhu <wujinhu...@126.com> Committed: Wed Nov 23 14:29:48 2016 +0800 ---------------------------------------------------------------------- .../alert-metadata-service/pom.xml | 13 +- .../impl/TopologyMgmtResourceImplTest.java | 72 ++++ .../src/test/resources/application.conf | 10 +- .../alert-metadata/pom.xml | 12 + .../eagle/alert/metadata/MetadataUtils.java | 1 + .../metadata/impl/InMemMetadataDaoImpl.java | 6 +- .../eagle/alert/metadata/TestMetadataUtils.java | 59 ++++ .../eagle/alert/metadata/impl/InMemoryTest.java | 117 +++++++ .../eagle/alert/metadata/impl/JdbcImplTest.java | 164 +++++++++ .../alert/metadata/impl/MongoImplTest.java | 344 ++++++++++++++++++ .../alert/resource/impl/InMemoryTest.java | 48 --- .../alert/resource/impl/JdbcImplTest.java | 165 --------- .../alert/resource/impl/MongoImplTest.java | 345 ------------------- 13 files changed, 791 insertions(+), 565 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml index 9d5e8f1..cf1f0fc 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml @@ -84,7 +84,18 @@ <groupId>io.swagger</groupId> <artifactId>swagger-jaxrs</artifactId> </dependency> - + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-module-junit4</artifactId> + <version>${powermock.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-api-mockito</artifactId> + <version>${powermock.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImplTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImplTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImplTest.java index e46213e..b9a7634 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImplTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImplTest.java @@ -18,13 +18,40 @@ package org.apache.eagle.service.topology.resource.impl; +import backtype.storm.StormSubmitter; +import backtype.storm.generated.ClusterSummary; +import backtype.storm.generated.Nimbus; +import backtype.storm.generated.StormTopology; +import backtype.storm.generated.TopologySummary; +import backtype.storm.utils.NimbusClient; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.eagle.alert.coordination.model.internal.Topology; +import org.apache.eagle.alert.engine.coordinator.StreamingCluster; +import org.apache.eagle.alert.metadata.IMetadataDao; +import org.apache.eagle.alert.metadata.impl.InMemMetadataDaoImpl; +import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory; import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import static org.powermock.api.mockito.PowerMockito.when; +@RunWith(PowerMockRunner.class) +@PrepareForTest({TopologyMgmtResourceImpl.class, StormSubmitter.class}) public class TopologyMgmtResourceImplTest { TopologyMgmtResourceImpl topologyManager = new TopologyMgmtResourceImpl(); String topologyName = "testStartTopology"; @@ -52,4 +79,49 @@ public class TopologyMgmtResourceImplTest { List<TopologyStatus> topologies = topologyManager.getTopologies(); Assert.assertTrue(topologies.size() == 1); } + + @Test + public void testGetTopologies1() throws Exception { + IMetadataDao dao = MetadataDaoFactory.getInstance().getMetadataDao(); + TopologyMgmtResourceImpl service = new TopologyMgmtResourceImpl(); + Field daoField = TopologyMgmtResourceImpl.class.getDeclaredField("dao"); + daoField.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(daoField, daoField.getModifiers() & ~Modifier.FINAL); + daoField.set(null, dao); + // set data + Topology topology = new Topology("test", 1, 1); + StreamingCluster cluster =new StreamingCluster(); + dao.clear(); + dao.addTopology(topology); + dao.addCluster(cluster); + TopologyMgmtResourceImpl spy = PowerMockito.spy(service); + PowerMockito.doReturn(new TopologySummary()).when(spy,"getTopologySummery", Mockito.anyCollection(), Mockito.any(Topology.class)); + Assert.assertEquals(1, spy.getTopologies().size()); + } + + @Test + public void testStartTopology1() throws Exception { + IMetadataDao dao = MetadataDaoFactory.getInstance().getMetadataDao(); + TopologyMgmtResourceImpl service = new TopologyMgmtResourceImpl(); + Field daoField = TopologyMgmtResourceImpl.class.getDeclaredField("dao"); + daoField.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(daoField, daoField.getModifiers() & ~Modifier.FINAL); + daoField.set(null, dao); + // set data + Topology topology = new Topology("test", 1, 1); + StreamingCluster cluster =new StreamingCluster(); + dao.clear(); + dao.addTopology(topology); + dao.addCluster(cluster); + PowerMockito.mockStatic(StormSubmitter.class); + PowerMockito.doNothing().when(StormSubmitter.class, "submitTopology",Mockito.eq("test"), Mockito.anyMap(), Mockito.any(StormTopology.class)); + TopologyMgmtResourceImpl spy = PowerMockito.spy(service); + PowerMockito.doReturn(null).when(spy,"createTopology", Mockito.any(Topology.class)); + spy.startTopology("test"); + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/resources/application.conf index f760241..1b6a281 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/resources/application.conf +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/resources/application.conf @@ -13,8 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -{ - "datastore": { - "metadataDao": "org.apache.eagle.alert.metadata.impl.InMemMetadataDaoImpl" +metadata { + metadataDao = org.apache.eagle.alert.metadata.impl.InMemMetadataDaoImpl + jdbc { + url = "localhost:27017" + } + properties { + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml index 07f373a..ebe24e2 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml @@ -64,6 +64,18 @@ <artifactId>guice</artifactId> <version>3.0</version> </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-module-junit4</artifactId> + <version>${powermock.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-api-mockito</artifactId> + <version>${powermock.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java index 3e03b57..be22280 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java @@ -75,6 +75,7 @@ public class MetadataUtils { } public static Connection getJdbcConnection(Config config) { + Connection connection = null; try { if (config.hasPath(JDBC_USERNAME_PATH)) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java index b608516..611bbb4 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java @@ -16,6 +16,8 @@ */ package org.apache.eagle.alert.metadata.impl; +import com.google.inject.Inject; +import com.typesafe.config.Config; import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; import org.apache.eagle.alert.coordination.model.ScheduleState; import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; @@ -26,8 +28,6 @@ import org.apache.eagle.alert.metadata.IMetadataDao; import org.apache.eagle.alert.metadata.MetadataUtils; import org.apache.eagle.alert.metadata.resource.Models; import org.apache.eagle.alert.metadata.resource.OpResult; -import com.google.inject.Inject; -import com.typesafe.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -296,7 +296,7 @@ public class InMemMetadataDaoImpl implements IMetadataDao { } @Override - public OpResult clear() { + public synchronized OpResult clear() { LOG.info("clear models..."); this.assignments.clear(); this.clusters.clear(); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/TestMetadataUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/TestMetadataUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/TestMetadataUtils.java new file mode 100644 index 0000000..1191dcb --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/TestMetadataUtils.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.metadata; + +import org.apache.eagle.alert.coordination.model.ScheduleState; +import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; +import org.apache.eagle.alert.engine.coordinator.StreamDefinition; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +/** + * Created by luokun on 2016/11/16. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(MetadataUtils.class) +public class TestMetadataUtils { + + @Rule + public ExpectedException thrown= ExpectedException.none(); + + @Test + public void testGetKey() throws Exception { + StreamDefinition stream = new StreamDefinition(); + Assert.assertNull(MetadataUtils.getKey(stream)); + PolicyAssignment policyAssignment = new PolicyAssignment(); + policyAssignment.setPolicyName("test"); + Assert.assertEquals("test", MetadataUtils.getKey(policyAssignment)); + ScheduleState scheduleState = new ScheduleState(); + scheduleState.setVersion("1.0"); + Assert.assertEquals("1.0", MetadataUtils.getKey(scheduleState)); + } + + @Test + public void testGetKeyThrowable() { + thrown.expect(RuntimeException.class); + Object obj = new Object(); + MetadataUtils.getKey(obj); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/InMemoryTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/InMemoryTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/InMemoryTest.java new file mode 100644 index 0000000..7655f54 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/InMemoryTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.metadata.impl; + +import com.google.common.collect.Lists; +import com.typesafe.config.ConfigFactory; +import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; +import org.apache.eagle.alert.coordination.model.ScheduleState; +import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; +import org.apache.eagle.alert.engine.coordinator.StreamingCluster; +import org.apache.eagle.alert.engine.model.AlertPublishEvent; +import org.apache.eagle.alert.metadata.IMetadataDao; +import org.apache.eagle.alert.metadata.resource.OpResult; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * @since May 1, 2016 + */ +public class InMemoryTest { + + private IMetadataDao dao = new InMemMetadataDaoImpl(ConfigFactory.load()); + + @Test + public void test_AddPolicy() { + + LoggerFactory.getLogger(InMemoryTest.class); + + MetadataDaoFactory.getInstance().getMetadataDao(); + + PolicyDefinition pd = new PolicyDefinition(); + pd.setName("pd1"); + dao.addPolicy(pd); + + Assert.assertEquals(1, dao.listPolicies().size()); + } + + @Test + public void testAddCluster(){ + StreamingCluster cluster1 = new StreamingCluster(); + cluster1.setName("test1"); + StreamingCluster cluster2 = new StreamingCluster(); + cluster2.setName("test2"); + StreamingCluster cluster3 = new StreamingCluster(); + cluster3.setName("test2"); + OpResult opResult1 = dao.addCluster(cluster1); + Assert.assertEquals(OpResult.SUCCESS,opResult1.code); + OpResult opResult2 = dao.addCluster(cluster2); + Assert.assertEquals(OpResult.SUCCESS,opResult2.code); + OpResult opResult3 = dao.addCluster(cluster3); + Assert.assertEquals(OpResult.SUCCESS,opResult3.code); + Assert.assertTrue(opResult3.message.contains("replace")); + dao.clear(); + } + + @Test + public void testRemoveDataSource(){ + Kafka2TupleMetadata dataSource1 = new Kafka2TupleMetadata(); + Kafka2TupleMetadata dataSource2 = new Kafka2TupleMetadata(); + dataSource1.setName("test1"); + dataSource2.setName("test2"); + dao.addDataSource(dataSource1); + dao.addDataSource(dataSource2); + OpResult opResult1 = dao.removeDataSource("test1"); + Assert.assertEquals(OpResult.SUCCESS, opResult1.code); + OpResult opResult2 = dao.removeDataSource("test1"); + Assert.assertEquals(OpResult.SUCCESS, opResult2.code); + Assert.assertTrue(opResult2.message.contains("no configuration")); + dao.clear(); + } + + @Test + public void testListAlertPublishEvent(){ + dao.addAlertPublishEvent(new AlertPublishEvent()); + dao.addAlertPublishEvent(new AlertPublishEvent()); + Assert.assertEquals(2,dao.listAlertPublishEvent(5).size()); + } + + @Test + public void testGetAlertPublishEventByPolicyId(){ + AlertPublishEvent alert1 = new AlertPublishEvent(); + AlertPublishEvent alert2 = new AlertPublishEvent(); + alert1.setAlertId("1"); + alert1.setPolicyId("1"); + alert2.setAlertId("2"); + alert2.setPolicyId("1"); + dao.addAlertPublishEvent(alert1); + dao.addAlertPublishEvent(alert2); + Assert.assertNotNull(dao.getAlertPublishEvent("1")); + Assert.assertEquals(2, dao.getAlertPublishEventByPolicyId("1", 2).size()); + } + + @Test + public void testAddScheduleState(){ + ScheduleState scheduleState = new ScheduleState(); + scheduleState.setVersion("1"); + Assert.assertEquals(OpResult.SUCCESS,dao.addScheduleState(scheduleState).code); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java new file mode 100644 index 0000000..7a2fcb5 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.eagle.alert.metadata.impl; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; +import org.apache.eagle.alert.coordination.model.ScheduleState; +import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; +import org.apache.eagle.alert.coordination.model.internal.Topology; +import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; +import org.apache.eagle.alert.engine.coordinator.Publishment; +import org.apache.eagle.alert.engine.coordinator.PublishmentType; +import org.apache.eagle.alert.engine.coordinator.StreamingCluster; +import org.apache.eagle.alert.metadata.IMetadataDao; +import org.apache.eagle.alert.metadata.MetadataUtils; +import org.apache.eagle.alert.metadata.resource.OpResult; +import org.junit.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Date; +import java.util.List; + +public class JdbcImplTest { + private static Logger LOG = LoggerFactory.getLogger(JdbcImplTest.class); + static IMetadataDao dao; + + @BeforeClass + public static void setup() { + System.setProperty("config.resource", "/application-mysql.conf"); + ConfigFactory.invalidateCaches(); + Config config = ConfigFactory.load(); + dao = new JdbcMetadataDaoImpl(config.getConfig(MetadataUtils.META_DATA)); + } + + @AfterClass + public static void teardown() { + if (dao != null) { + try { + dao.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + private String TOPO_NAME = "topoName"; + + @Ignore + @Test + public void test_apis() { + // publishment + { + Publishment publishment = new Publishment(); + publishment.setName("pub-"); + OpResult result = dao.addPublishment(publishment); + Assert.assertEquals(200, result.code); + List<Publishment> assigns = dao.listPublishment(); + Assert.assertEquals(1, assigns.size()); + result = dao.removePublishment("pub-"); + Assert.assertTrue(200 == result.code); + } + // topology + { + OpResult result = dao.addTopology(new Topology(TOPO_NAME, 3, 5)); + System.out.println(result.message); + Assert.assertEquals(200, result.code); + List<Topology> topos = dao.listTopologies(); + Assert.assertEquals(1, topos.size()); + // add again: replace existing one + result = dao.addTopology(new Topology(TOPO_NAME, 4, 5)); + topos = dao.listTopologies(); + Assert.assertEquals(1, topos.size()); + Assert.assertEquals(TOPO_NAME, topos.get(0).getName()); + Assert.assertEquals(4, topos.get(0).getNumOfGroupBolt()); + } + // assignment + { + PolicyAssignment assignment = new PolicyAssignment(); + assignment.setPolicyName("policy1"); + OpResult result = dao.addAssignment(assignment); + Assert.assertEquals(200, result.code); + List<PolicyAssignment> assigns = dao.listAssignments(); + Assert.assertEquals(1, assigns.size()); + } + // cluster + { + StreamingCluster cluster = new StreamingCluster(); + cluster.setName("dd"); + OpResult result = dao.addCluster(cluster); + Assert.assertEquals(200, result.code); + List<StreamingCluster> assigns = dao.listClusters(); + Assert.assertEquals(1, assigns.size()); + } + // data source + { + Kafka2TupleMetadata dataSource = new Kafka2TupleMetadata(); + dataSource.setName("ds"); + OpResult result = dao.addDataSource(dataSource); + Assert.assertEquals(200, result.code); + List<Kafka2TupleMetadata> assigns = dao.listDataSources(); + Assert.assertEquals(1, assigns.size()); + } + // policy + { + PolicyDefinition policy = new PolicyDefinition(); + policy.setName("ds"); + OpResult result = dao.addPolicy(policy); + Assert.assertEquals(200, result.code); + List<PolicyDefinition> assigns = dao.listPolicies(); + Assert.assertEquals(1, assigns.size()); + } + + // publishmentType + { + PublishmentType publishmentType = new PublishmentType(); + publishmentType.setType("KAFKA"); + OpResult result = dao.addPublishmentType(publishmentType); + Assert.assertEquals(200, result.code); + List<PublishmentType> assigns = dao.listPublishmentType(); + Assert.assertEquals(1, assigns.size()); + } + } + + private void test_addstate() { + ScheduleState state = new ScheduleState(); + String versionId = "state-" + System.currentTimeMillis(); + state.setVersion(versionId); + state.setGenerateTime(String.valueOf(new Date().getTime())); + OpResult result = dao.addScheduleState(state); + Assert.assertEquals(200, result.code); + state = dao.getScheduleState(); + Assert.assertEquals(state.getVersion(), versionId); + } + + @Ignore + @Test + public void test_readCurrentState() { + test_addstate(); + ScheduleState state = dao.getScheduleState(); + Assert.assertNotNull(state); + + LOG.debug(state.getVersion()); + LOG.debug(state.getGenerateTime()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/MongoImplTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/MongoImplTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/MongoImplTest.java new file mode 100644 index 0000000..3b3ddf9 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/MongoImplTest.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.metadata.impl; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import de.flapdoodle.embed.mongo.MongodExecutable; +import de.flapdoodle.embed.mongo.MongodProcess; +import de.flapdoodle.embed.mongo.MongodStarter; +import de.flapdoodle.embed.mongo.config.MongodConfigBuilder; +import de.flapdoodle.embed.mongo.config.Net; +import de.flapdoodle.embed.mongo.distribution.Version; +import de.flapdoodle.embed.process.runtime.Network; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.eagle.alert.coordination.model.*; +import org.apache.eagle.alert.coordination.model.internal.MonitoredStream; +import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; +import org.apache.eagle.alert.coordination.model.internal.StreamGroup; +import org.apache.eagle.alert.coordination.model.internal.Topology; +import org.apache.eagle.alert.engine.coordinator.*; +import org.apache.eagle.alert.engine.model.AlertPublishEvent; +import org.apache.eagle.alert.metadata.IMetadataDao; +import org.apache.eagle.alert.metadata.MetadataUtils; +import org.apache.eagle.alert.metadata.resource.OpResult; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +/** + * @since May 1, 2016 + */ +public class MongoImplTest { + private static Logger LOG = LoggerFactory.getLogger(MongoImplTest.class); + static IMetadataDao dao; + + private static MongodExecutable mongodExe; + private static MongodProcess mongod; + + public static void before() { + try { + MongodStarter starter = MongodStarter.getDefaultInstance(); + mongodExe = starter.prepare(new MongodConfigBuilder().version(Version.V3_2_1) + .net(new Net(27017, Network.localhostIsIPv6())).build()); + mongod = mongodExe.start(); + } catch (Exception e) { + LOG.error("start embed mongod failed, assume some external mongo running. continue run test!", e); + } + } + + @BeforeClass + public static void setup() { + before(); + + System.setProperty("config.resource", "/application-mongo.conf"); + ConfigFactory.invalidateCaches(); + Config config = ConfigFactory.load(); + dao = new MongoMetadataDaoImpl(config.getConfig(MetadataUtils.META_DATA)); + + } + + @AfterClass + public static void teardown() { + if (mongod != null) { + try { + mongod.stop(); + } catch (IllegalStateException e) { + // catch this exception for the unstable stopping mongodb + // reason: the exception is usually thrown out with below message format when stop() returns null value, + // but actually this should have been captured in ProcessControl.stopOrDestroyProcess() by destroying + // the process ultimately + if (e.getMessage() != null && e.getMessage().matches("^Couldn't kill.*process!.*")) { + // if matches, do nothing, just ignore the exception + } else { + LOG.warn(String.format("Ignored error for stopping mongod process, see stack trace: %s", ExceptionUtils.getStackTrace(e))); + } + } + mongodExe.stop(); + } + } + + private String TOPO_NAME = "topoName"; + + @Test + public void test_apis() throws Exception { + // topology + { + OpResult result = dao.addTopology(new Topology(TOPO_NAME, 3, 5)); + System.out.println(result.message); + Assert.assertEquals(200, result.code); + List<Topology> topos = dao.listTopologies(); + Assert.assertEquals(1, topos.size()); + + result = dao.addTopology(new Topology(TOPO_NAME + "-new", 3, 5)); + topos = dao.listTopologies(); + Assert.assertEquals(2, topos.size()); + // add again: replace existing one + result = dao.addTopology(new Topology(TOPO_NAME, 4, 5)); + topos = dao.listTopologies(); + Assert.assertEquals(2, topos.size()); + Assert.assertEquals(TOPO_NAME, topos.get(0).getName()); + Assert.assertEquals(4, topos.get(0).getNumOfGroupBolt()); + } + // assignment + { + PolicyAssignment assignment = new PolicyAssignment(); + assignment.setPolicyName("policy1"); + OpResult result = dao.addAssignment(assignment); + Assert.assertEquals(200, result.code); + List<PolicyAssignment> assigns = dao.listAssignments(); + Assert.assertEquals(1, assigns.size()); + } + // cluster + { + StreamingCluster cluster = new StreamingCluster(); + cluster.setName("dd"); + OpResult result = dao.addCluster(cluster); + Assert.assertEquals(200, result.code); + List<StreamingCluster> assigns = dao.listClusters(); + Assert.assertEquals(1, assigns.size()); + } + // data source + { + Kafka2TupleMetadata dataSource = new Kafka2TupleMetadata(); + dataSource.setName("ds"); + OpResult result = dao.addDataSource(dataSource); + Assert.assertEquals(200, result.code); + List<Kafka2TupleMetadata> assigns = dao.listDataSources(); + Assert.assertEquals(1, assigns.size()); + } + // policy + { + PolicyDefinition policy = new PolicyDefinition(); + policy.setName("ds"); + OpResult result = dao.addPolicy(policy); + Assert.assertEquals(200, result.code); + List<PolicyDefinition> assigns = dao.listPolicies(); + Assert.assertEquals(1, assigns.size()); + } + // publishment + { + Publishment publishment = new Publishment(); + publishment.setName("pub-"); + OpResult result = dao.addPublishment(publishment); + Assert.assertEquals(200, result.code); + List<Publishment> assigns = dao.listPublishment(); + Assert.assertEquals(1, assigns.size()); + } + // publishmentType + { + PublishmentType publishmentType = new PublishmentType(); + publishmentType.setType("KAFKA"); + OpResult result = dao.addPublishmentType(publishmentType); + Assert.assertEquals(200, result.code); + List<PublishmentType> assigns = dao.listPublishmentType(); + Assert.assertEquals(1, assigns.size()); + } + + // schedule state + { + ScheduleState state = new ScheduleState(); + state.setVersion("001"); + state.setScheduleTimeMillis(3000); + state.setCode(200); + OpResult result = dao.addScheduleState(state); + Assert.assertEquals(200, result.code); + + Thread.sleep(1000); + + state = new ScheduleState(); + state.setScheduleTimeMillis(3000); + state.setVersion("002"); + state.setCode(201); + result = dao.addScheduleState(state); + Assert.assertEquals(200, result.code); + + ScheduleState getState = dao.getScheduleState(); + Assert.assertEquals(201, getState.getCode()); + } + // stream + { + StreamDefinition stream = new StreamDefinition(); + stream.setStreamId("stream"); + OpResult result = dao.createStream(stream); + Assert.assertEquals(200, result.code); + List<StreamDefinition> assigns = dao.listStreams(); + Assert.assertEquals(1, assigns.size()); + } + // alert + { + AlertPublishEvent alert = new AlertPublishEvent(); + alert.setAlertTimestamp(System.currentTimeMillis()); + alert.setAlertId(UUID.randomUUID().toString()); + OpResult result = dao.addAlertPublishEvent(alert); + Assert.assertEquals(200, result.code); + List<AlertPublishEvent> alerts = dao.listAlertPublishEvent(2); + Assert.assertEquals(1, alerts.size()); + } + } + + private void test_addstate() { + ScheduleState state = new ScheduleState(); + state.setVersion("state-" + System.currentTimeMillis()); + state.setGenerateTime(String.valueOf(new Date().getTime())); + OpResult result = dao.addScheduleState(state); + Assert.assertEquals(200, result.code); + } + + @Test + public void test_readCurrentState() { + test_addstate(); + ScheduleState state = dao.getScheduleState(); + Assert.assertNotNull(state); + + System.out.println(state.getVersion()); + System.out.println(state.getGenerateTime()); + } + + private void test_addCompleteScheduleState() { + Long timestamp = System.currentTimeMillis(); + String version = "state-" + timestamp; + + // SpoutSpec + Map<String, SpoutSpec> spoutSpecsMap = new HashMap<>(); + SpoutSpec spoutSpec1 = new SpoutSpec(); + String topologyId1 = "testUnitTopology1_" + timestamp; + spoutSpec1.setTopologyId(topologyId1); + + Map<String, Kafka2TupleMetadata> kafka2TupleMetadataMap = new HashMap<>(); + Kafka2TupleMetadata kafka2TupleMetadata = new Kafka2TupleMetadata(); + kafka2TupleMetadata.setType("KAFKA"); + kafka2TupleMetadataMap.put("preprocess.network-sherlock.events", kafka2TupleMetadata); + spoutSpec1.setKafka2TupleMetadataMap(kafka2TupleMetadataMap); + + Map<String, List<StreamRepartitionMetadata>> streamRepartitionMetadataMap= new HashMap<>(); + List<StreamRepartitionMetadata> StreamRepartitionMetadataList = new ArrayList<>(); + StreamRepartitionMetadata streamRepartitionMetadata = new StreamRepartitionMetadata(); + List<StreamRepartitionStrategy> groupingStrategies = new ArrayList(); + StreamRepartitionStrategy streamRepartitionStrategy = new StreamRepartitionStrategy(); + streamRepartitionStrategy.setStartSequence(4); + groupingStrategies.add(streamRepartitionStrategy); + streamRepartitionMetadata.setGroupingStrategies(groupingStrategies); + StreamRepartitionMetadataList.add(streamRepartitionMetadata); + streamRepartitionMetadataMap.put("preprocess.network-nervecenter.events", StreamRepartitionMetadataList); + spoutSpec1.setStreamRepartitionMetadataMap(streamRepartitionMetadataMap); + spoutSpecsMap.put(topologyId1, spoutSpec1); + + SpoutSpec spoutSpec2 = new SpoutSpec(); + String topologyId2 = "testUnitTopology2_" + timestamp; + spoutSpec2.setTopologyId(topologyId2); + spoutSpec2.setKafka2TupleMetadataMap(kafka2TupleMetadataMap); + spoutSpec2.setStreamRepartitionMetadataMap(streamRepartitionMetadataMap); + spoutSpecsMap.put(topologyId2, spoutSpec2); + + // Alert Spec + Map<String, AlertBoltSpec> alertSpecsMap = new HashMap<>(); + alertSpecsMap.put(topologyId1, new AlertBoltSpec(topologyId1)); + + // GroupSpec + Map<String, RouterSpec> groupSpecsMap = new HashMap<>(); + groupSpecsMap.put(topologyId1, new RouterSpec(topologyId1)); + + // PublishSpec + Map<String, PublishSpec> pubMap = new HashMap<>(); + pubMap.put(topologyId1, new PublishSpec(topologyId1, "testPublishBolt")); + + // Policy Snapshots + Collection<PolicyDefinition> policySnapshots = new ArrayList<>(); + PolicyDefinition policy = new PolicyDefinition(); + policy.setName("testPolicyDefinition"); + PolicyDefinition.Definition def = new PolicyDefinition.Definition(); + def.setValue("1,jobID,job1,daily_rule,14:00:00,15:00:00"); + def.setType("absencealert"); + policy.setDefinition(def); + policySnapshots.add(policy); + + // Stream Snapshots + Collection<StreamDefinition> streams = new ArrayList<>(); + StreamDefinition stream = new StreamDefinition(); + stream.setStreamId("testStream"); + streams.add(stream); + + // Monitored Streams + Collection<MonitoredStream> monitoredStreams = new ArrayList<>(); + StreamPartition partition = new StreamPartition(); + partition.setType(StreamPartition.Type.GLOBAL); + partition.setStreamId("s1"); + partition.setColumns(Arrays.asList("f1", "f2")); + StreamGroup sg = new StreamGroup(); + sg.addStreamPartition(partition); + MonitoredStream monitoredStream = new MonitoredStream(sg); + monitoredStreams.add(monitoredStream); + + // Assignments + Collection<PolicyAssignment> assignments = new ArrayList<>(); + assignments.add(new PolicyAssignment("syslog_regex", "SG[syslog_stream-]" + timestamp)); + + ScheduleState state = new ScheduleState(version, spoutSpecsMap, groupSpecsMap, alertSpecsMap, pubMap, + assignments, monitoredStreams, policySnapshots, streams); + + OpResult result = dao.addScheduleState(state); + Assert.assertEquals(200, result.code); + } + + @Test + public void test_readCompleteScheduleState() { + test_addCompleteScheduleState(); + + ScheduleState state = dao.getScheduleState(); + Assert.assertNotNull(state); + Assert.assertEquals(2, state.getSpoutSpecs().size()); + Assert.assertEquals(1, state.getAlertSpecs().size()); + Assert.assertEquals(1, state.getGroupSpecs().size()); + Assert.assertEquals(1, state.getPublishSpecs().size()); + Assert.assertEquals(1, state.getPolicySnapshots().size()); + Assert.assertEquals(1, state.getStreamSnapshots().size()); + Assert.assertEquals(1, state.getMonitoredStreams().size()); + Assert.assertEquals(1, state.getAssignments().size()); + + + System.out.println(state.getVersion()); + System.out.println(state.getGenerateTime()); + + + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/InMemoryTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/InMemoryTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/InMemoryTest.java deleted file mode 100644 index 840f4a7..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/InMemoryTest.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.service.alert.resource.impl; - -import com.typesafe.config.ConfigFactory; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.metadata.IMetadataDao; -import org.apache.eagle.alert.metadata.impl.InMemMetadataDaoImpl; -import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.LoggerFactory; - -/** - * @since May 1, 2016 - */ -public class InMemoryTest { - - private IMetadataDao dao = new InMemMetadataDaoImpl(ConfigFactory.load()); - - @Test - public void test_AddPolicy() { - - LoggerFactory.getLogger(InMemoryTest.class); - - MetadataDaoFactory.getInstance().getMetadataDao(); - - PolicyDefinition pd = new PolicyDefinition(); - pd.setName("pd1"); - dao.addPolicy(pd); - - Assert.assertEquals(1, dao.listPolicies().size()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/JdbcImplTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/JdbcImplTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/JdbcImplTest.java deleted file mode 100644 index 158c0c2..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/JdbcImplTest.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.service.alert.resource.impl; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; -import org.apache.eagle.alert.coordination.model.ScheduleState; -import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; -import org.apache.eagle.alert.coordination.model.internal.Topology; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.coordinator.PublishmentType; -import org.apache.eagle.alert.engine.coordinator.StreamingCluster; -import org.apache.eagle.alert.metadata.IMetadataDao; -import org.apache.eagle.alert.metadata.MetadataUtils; -import org.apache.eagle.alert.metadata.impl.JdbcMetadataDaoImpl; -import org.apache.eagle.alert.metadata.resource.OpResult; -import org.junit.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Date; -import java.util.List; - -public class JdbcImplTest { - private static Logger LOG = LoggerFactory.getLogger(JdbcImplTest.class); - static IMetadataDao dao; - - @BeforeClass - public static void setup() { - System.setProperty("config.resource", "/application-mysql.conf"); - ConfigFactory.invalidateCaches(); - Config config = ConfigFactory.load(); - dao = new JdbcMetadataDaoImpl(config.getConfig(MetadataUtils.META_DATA)); - } - - @AfterClass - public static void teardown() { - if (dao != null) { - try { - dao.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - - private String TOPO_NAME = "topoName"; - - @Ignore - @Test - public void test_apis() { - // publishment - { - Publishment publishment = new Publishment(); - publishment.setName("pub-"); - OpResult result = dao.addPublishment(publishment); - Assert.assertEquals(200, result.code); - List<Publishment> assigns = dao.listPublishment(); - Assert.assertEquals(1, assigns.size()); - result = dao.removePublishment("pub-"); - Assert.assertTrue(200 == result.code); - } - // topology - { - OpResult result = dao.addTopology(new Topology(TOPO_NAME, 3, 5)); - System.out.println(result.message); - Assert.assertEquals(200, result.code); - List<Topology> topos = dao.listTopologies(); - Assert.assertEquals(1, topos.size()); - // add again: replace existing one - result = dao.addTopology(new Topology(TOPO_NAME, 4, 5)); - topos = dao.listTopologies(); - Assert.assertEquals(1, topos.size()); - Assert.assertEquals(TOPO_NAME, topos.get(0).getName()); - Assert.assertEquals(4, topos.get(0).getNumOfGroupBolt()); - } - // assignment - { - PolicyAssignment assignment = new PolicyAssignment(); - assignment.setPolicyName("policy1"); - OpResult result = dao.addAssignment(assignment); - Assert.assertEquals(200, result.code); - List<PolicyAssignment> assigns = dao.listAssignments(); - Assert.assertEquals(1, assigns.size()); - } - // cluster - { - StreamingCluster cluster = new StreamingCluster(); - cluster.setName("dd"); - OpResult result = dao.addCluster(cluster); - Assert.assertEquals(200, result.code); - List<StreamingCluster> assigns = dao.listClusters(); - Assert.assertEquals(1, assigns.size()); - } - // data source - { - Kafka2TupleMetadata dataSource = new Kafka2TupleMetadata(); - dataSource.setName("ds"); - OpResult result = dao.addDataSource(dataSource); - Assert.assertEquals(200, result.code); - List<Kafka2TupleMetadata> assigns = dao.listDataSources(); - Assert.assertEquals(1, assigns.size()); - } - // policy - { - PolicyDefinition policy = new PolicyDefinition(); - policy.setName("ds"); - OpResult result = dao.addPolicy(policy); - Assert.assertEquals(200, result.code); - List<PolicyDefinition> assigns = dao.listPolicies(); - Assert.assertEquals(1, assigns.size()); - } - - // publishmentType - { - PublishmentType publishmentType = new PublishmentType(); - publishmentType.setType("KAFKA"); - OpResult result = dao.addPublishmentType(publishmentType); - Assert.assertEquals(200, result.code); - List<PublishmentType> assigns = dao.listPublishmentType(); - Assert.assertEquals(1, assigns.size()); - } - } - - private void test_addstate() { - ScheduleState state = new ScheduleState(); - String versionId = "state-" + System.currentTimeMillis(); - state.setVersion(versionId); - state.setGenerateTime(String.valueOf(new Date().getTime())); - OpResult result = dao.addScheduleState(state); - Assert.assertEquals(200, result.code); - state = dao.getScheduleState(); - Assert.assertEquals(state.getVersion(), versionId); - } - - @Ignore - @Test - public void test_readCurrentState() { - test_addstate(); - ScheduleState state = dao.getScheduleState(); - Assert.assertNotNull(state); - - LOG.debug(state.getVersion()); - LOG.debug(state.getGenerateTime()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java deleted file mode 100644 index 4328be3..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java +++ /dev/null @@ -1,345 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.service.alert.resource.impl; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import de.flapdoodle.embed.mongo.MongodExecutable; -import de.flapdoodle.embed.mongo.MongodProcess; -import de.flapdoodle.embed.mongo.MongodStarter; -import de.flapdoodle.embed.mongo.config.MongodConfigBuilder; -import de.flapdoodle.embed.mongo.config.Net; -import de.flapdoodle.embed.mongo.distribution.Version; -import de.flapdoodle.embed.process.runtime.Network; -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.eagle.alert.coordination.model.*; -import org.apache.eagle.alert.coordination.model.internal.MonitoredStream; -import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; -import org.apache.eagle.alert.coordination.model.internal.StreamGroup; -import org.apache.eagle.alert.coordination.model.internal.Topology; -import org.apache.eagle.alert.engine.coordinator.*; -import org.apache.eagle.alert.engine.model.AlertPublishEvent; -import org.apache.eagle.alert.metadata.IMetadataDao; -import org.apache.eagle.alert.metadata.MetadataUtils; -import org.apache.eagle.alert.metadata.impl.MongoMetadataDaoImpl; -import org.apache.eagle.alert.metadata.resource.OpResult; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; - -/** - * @since May 1, 2016 - */ -public class MongoImplTest { - private static Logger LOG = LoggerFactory.getLogger(MongoImplTest.class); - static IMetadataDao dao; - - private static MongodExecutable mongodExe; - private static MongodProcess mongod; - - public static void before() { - try { - MongodStarter starter = MongodStarter.getDefaultInstance(); - mongodExe = starter.prepare(new MongodConfigBuilder().version(Version.V3_2_1) - .net(new Net(27017, Network.localhostIsIPv6())).build()); - mongod = mongodExe.start(); - } catch (Exception e) { - LOG.error("start embed mongod failed, assume some external mongo running. continue run test!", e); - } - } - - @BeforeClass - public static void setup() { - before(); - - System.setProperty("config.resource", "/application-mongo.conf"); - ConfigFactory.invalidateCaches(); - Config config = ConfigFactory.load(); - dao = new MongoMetadataDaoImpl(config.getConfig(MetadataUtils.META_DATA)); - - } - - @AfterClass - public static void teardown() { - if (mongod != null) { - try { - mongod.stop(); - } catch (IllegalStateException e) { - // catch this exception for the unstable stopping mongodb - // reason: the exception is usually thrown out with below message format when stop() returns null value, - // but actually this should have been captured in ProcessControl.stopOrDestroyProcess() by destroying - // the process ultimately - if (e.getMessage() != null && e.getMessage().matches("^Couldn't kill.*process!.*")) { - // if matches, do nothing, just ignore the exception - } else { - LOG.warn(String.format("Ignored error for stopping mongod process, see stack trace: %s", ExceptionUtils.getStackTrace(e))); - } - } - mongodExe.stop(); - } - } - - private String TOPO_NAME = "topoName"; - - @Test - public void test_apis() throws Exception { - // topology - { - OpResult result = dao.addTopology(new Topology(TOPO_NAME, 3, 5)); - System.out.println(result.message); - Assert.assertEquals(200, result.code); - List<Topology> topos = dao.listTopologies(); - Assert.assertEquals(1, topos.size()); - - result = dao.addTopology(new Topology(TOPO_NAME + "-new", 3, 5)); - topos = dao.listTopologies(); - Assert.assertEquals(2, topos.size()); - // add again: replace existing one - result = dao.addTopology(new Topology(TOPO_NAME, 4, 5)); - topos = dao.listTopologies(); - Assert.assertEquals(2, topos.size()); - Assert.assertEquals(TOPO_NAME, topos.get(0).getName()); - Assert.assertEquals(4, topos.get(0).getNumOfGroupBolt()); - } - // assignment - { - PolicyAssignment assignment = new PolicyAssignment(); - assignment.setPolicyName("policy1"); - OpResult result = dao.addAssignment(assignment); - Assert.assertEquals(200, result.code); - List<PolicyAssignment> assigns = dao.listAssignments(); - Assert.assertEquals(1, assigns.size()); - } - // cluster - { - StreamingCluster cluster = new StreamingCluster(); - cluster.setName("dd"); - OpResult result = dao.addCluster(cluster); - Assert.assertEquals(200, result.code); - List<StreamingCluster> assigns = dao.listClusters(); - Assert.assertEquals(1, assigns.size()); - } - // data source - { - Kafka2TupleMetadata dataSource = new Kafka2TupleMetadata(); - dataSource.setName("ds"); - OpResult result = dao.addDataSource(dataSource); - Assert.assertEquals(200, result.code); - List<Kafka2TupleMetadata> assigns = dao.listDataSources(); - Assert.assertEquals(1, assigns.size()); - } - // policy - { - PolicyDefinition policy = new PolicyDefinition(); - policy.setName("ds"); - OpResult result = dao.addPolicy(policy); - Assert.assertEquals(200, result.code); - List<PolicyDefinition> assigns = dao.listPolicies(); - Assert.assertEquals(1, assigns.size()); - } - // publishment - { - Publishment publishment = new Publishment(); - publishment.setName("pub-"); - OpResult result = dao.addPublishment(publishment); - Assert.assertEquals(200, result.code); - List<Publishment> assigns = dao.listPublishment(); - Assert.assertEquals(1, assigns.size()); - } - // publishmentType - { - PublishmentType publishmentType = new PublishmentType(); - publishmentType.setType("KAFKA"); - OpResult result = dao.addPublishmentType(publishmentType); - Assert.assertEquals(200, result.code); - List<PublishmentType> assigns = dao.listPublishmentType(); - Assert.assertEquals(1, assigns.size()); - } - - // schedule state - { - ScheduleState state = new ScheduleState(); - state.setVersion("001"); - state.setScheduleTimeMillis(3000); - state.setCode(200); - OpResult result = dao.addScheduleState(state); - Assert.assertEquals(200, result.code); - - Thread.sleep(1000); - - state = new ScheduleState(); - state.setScheduleTimeMillis(3000); - state.setVersion("002"); - state.setCode(201); - result = dao.addScheduleState(state); - Assert.assertEquals(200, result.code); - - ScheduleState getState = dao.getScheduleState(); - Assert.assertEquals(201, getState.getCode()); - } - // stream - { - StreamDefinition stream = new StreamDefinition(); - stream.setStreamId("stream"); - OpResult result = dao.createStream(stream); - Assert.assertEquals(200, result.code); - List<StreamDefinition> assigns = dao.listStreams(); - Assert.assertEquals(1, assigns.size()); - } - // alert - { - AlertPublishEvent alert = new AlertPublishEvent(); - alert.setAlertTimestamp(System.currentTimeMillis()); - alert.setAlertId(UUID.randomUUID().toString()); - OpResult result = dao.addAlertPublishEvent(alert); - Assert.assertEquals(200, result.code); - List<AlertPublishEvent> alerts = dao.listAlertPublishEvent(2); - Assert.assertEquals(1, alerts.size()); - } - } - - private void test_addstate() { - ScheduleState state = new ScheduleState(); - state.setVersion("state-" + System.currentTimeMillis()); - state.setGenerateTime(String.valueOf(new Date().getTime())); - OpResult result = dao.addScheduleState(state); - Assert.assertEquals(200, result.code); - } - - @Test - public void test_readCurrentState() { - test_addstate(); - ScheduleState state = dao.getScheduleState(); - Assert.assertNotNull(state); - - System.out.println(state.getVersion()); - System.out.println(state.getGenerateTime()); - } - - private void test_addCompleteScheduleState() { - Long timestamp = System.currentTimeMillis(); - String version = "state-" + timestamp; - - // SpoutSpec - Map<String, SpoutSpec> spoutSpecsMap = new HashMap<>(); - SpoutSpec spoutSpec1 = new SpoutSpec(); - String topologyId1 = "testUnitTopology1_" + timestamp; - spoutSpec1.setTopologyId(topologyId1); - - Map<String, Kafka2TupleMetadata> kafka2TupleMetadataMap = new HashMap<>(); - Kafka2TupleMetadata kafka2TupleMetadata = new Kafka2TupleMetadata(); - kafka2TupleMetadata.setType("KAFKA"); - kafka2TupleMetadataMap.put("preprocess.network-sherlock.events", kafka2TupleMetadata); - spoutSpec1.setKafka2TupleMetadataMap(kafka2TupleMetadataMap); - - Map<String, List<StreamRepartitionMetadata>> streamRepartitionMetadataMap= new HashMap<>(); - List<StreamRepartitionMetadata> StreamRepartitionMetadataList = new ArrayList<>(); - StreamRepartitionMetadata streamRepartitionMetadata = new StreamRepartitionMetadata(); - List<StreamRepartitionStrategy> groupingStrategies = new ArrayList(); - StreamRepartitionStrategy streamRepartitionStrategy = new StreamRepartitionStrategy(); - streamRepartitionStrategy.setStartSequence(4); - groupingStrategies.add(streamRepartitionStrategy); - streamRepartitionMetadata.setGroupingStrategies(groupingStrategies); - StreamRepartitionMetadataList.add(streamRepartitionMetadata); - streamRepartitionMetadataMap.put("preprocess.network-nervecenter.events", StreamRepartitionMetadataList); - spoutSpec1.setStreamRepartitionMetadataMap(streamRepartitionMetadataMap); - spoutSpecsMap.put(topologyId1, spoutSpec1); - - SpoutSpec spoutSpec2 = new SpoutSpec(); - String topologyId2 = "testUnitTopology2_" + timestamp; - spoutSpec2.setTopologyId(topologyId2); - spoutSpec2.setKafka2TupleMetadataMap(kafka2TupleMetadataMap); - spoutSpec2.setStreamRepartitionMetadataMap(streamRepartitionMetadataMap); - spoutSpecsMap.put(topologyId2, spoutSpec2); - - // Alert Spec - Map<String, AlertBoltSpec> alertSpecsMap = new HashMap<>(); - alertSpecsMap.put(topologyId1, new AlertBoltSpec(topologyId1)); - - // GroupSpec - Map<String, RouterSpec> groupSpecsMap = new HashMap<>(); - groupSpecsMap.put(topologyId1, new RouterSpec(topologyId1)); - - // PublishSpec - Map<String, PublishSpec> pubMap = new HashMap<>(); - pubMap.put(topologyId1, new PublishSpec(topologyId1, "testPublishBolt")); - - // Policy Snapshots - Collection<PolicyDefinition> policySnapshots = new ArrayList<>(); - PolicyDefinition policy = new PolicyDefinition(); - policy.setName("testPolicyDefinition"); - PolicyDefinition.Definition def = new PolicyDefinition.Definition(); - def.setValue("1,jobID,job1,daily_rule,14:00:00,15:00:00"); - def.setType("absencealert"); - policy.setDefinition(def); - policySnapshots.add(policy); - - // Stream Snapshots - Collection<StreamDefinition> streams = new ArrayList<>(); - StreamDefinition stream = new StreamDefinition(); - stream.setStreamId("testStream"); - streams.add(stream); - - // Monitored Streams - Collection<MonitoredStream> monitoredStreams = new ArrayList<>(); - StreamPartition partition = new StreamPartition(); - partition.setType(StreamPartition.Type.GLOBAL); - partition.setStreamId("s1"); - partition.setColumns(Arrays.asList("f1", "f2")); - StreamGroup sg = new StreamGroup(); - sg.addStreamPartition(partition); - MonitoredStream monitoredStream = new MonitoredStream(sg); - monitoredStreams.add(monitoredStream); - - // Assignments - Collection<PolicyAssignment> assignments = new ArrayList<>(); - assignments.add(new PolicyAssignment("syslog_regex", "SG[syslog_stream-]" + timestamp)); - - ScheduleState state = new ScheduleState(version, spoutSpecsMap, groupSpecsMap, alertSpecsMap, pubMap, - assignments, monitoredStreams, policySnapshots, streams); - - OpResult result = dao.addScheduleState(state); - Assert.assertEquals(200, result.code); - } - - @Test - public void test_readCompleteScheduleState() { - test_addCompleteScheduleState(); - - ScheduleState state = dao.getScheduleState(); - Assert.assertNotNull(state); - Assert.assertEquals(2, state.getSpoutSpecs().size()); - Assert.assertEquals(1, state.getAlertSpecs().size()); - Assert.assertEquals(1, state.getGroupSpecs().size()); - Assert.assertEquals(1, state.getPublishSpecs().size()); - Assert.assertEquals(1, state.getPolicySnapshots().size()); - Assert.assertEquals(1, state.getStreamSnapshots().size()); - Assert.assertEquals(1, state.getMonitoredStreams().size()); - Assert.assertEquals(1, state.getAssignments().size()); - - - System.out.println(state.getVersion()); - System.out.println(state.getGenerateTime()); - - - } -}