Repository: incubator-eagle Updated Branches: refs/heads/master 5ef6c183f -> 6a7842fc5
[EAGLE-829] refactor publishmentType to align with PolicyDefinition https://issues.apache.org/jira/browse/EAGLE-829 Author: Zhao, Qingwen <qingwz...@apache.org> Closes #717 from qingwen220/minor. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/6a7842fc Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/6a7842fc Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/6a7842fc Branch: refs/heads/master Commit: 6a7842fc554862fb2e2bf53a2133ac7804876cf0 Parents: 5ef6c18 Author: Zhao, Qingwen <qingwz...@apache.org> Authored: Wed Dec 7 11:34:22 2016 +0800 Committer: Zhao, Qingwen <qingwz...@apache.org> Committed: Wed Dec 7 11:34:22 2016 +0800 ---------------------------------------------------------------------- ...e.alert.app.AlertUnitTopologyAppProvider.xml | 6 +++++ .../engine/coordinator/PublishmentType.java | 24 +++++++++----------- .../engine/coordinator/PublishmentTypeTest.java | 8 +++---- .../metadata/resource/MetadataResource.java | 6 ++--- .../eagle/alert/metadata/MetadataUtils.java | 3 --- .../metadata/impl/JdbcMetadataDaoImpl.java | 4 ++-- .../metadata/impl/MongoMetadataDaoImpl.java | 10 +------- .../eagle/alert/metadata/impl/JdbcImplTest.java | 4 ++-- .../alert/metadata/impl/MongoImplTest.java | 2 +- .../src/main/bin/createTables.sql | 8 +++---- .../impl/IPMaskTopologyRackResolver.java | 4 ++-- .../topology/storm/TopologyDataPersistBolt.java | 20 +++++++++------- 12 files changed, 48 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6a7842fc/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml index 8ecbe8c..b6ad59b 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml +++ b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml @@ -96,6 +96,12 @@ <required>false</required> </property> <property> + <name>spout.stormKafkaTransactionZkQuorum</name> + <displayName>Spout Transaction Zookeeper Quorum</displayName> + <description>Required if reuse broker zookeeper is false</description> + <required>false</required> + </property> + <property> <name>spout.stormKafkaTransactionZkPath</name> <displayName>Spout Transaction Zookeeper Path</displayName> <value>/consumers</value> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6a7842fc/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java index 2718cfe..5bd15bc 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java @@ -27,27 +27,25 @@ import java.util.Objects; @JsonIgnoreProperties(ignoreUnknown = true) public class PublishmentType { - + private String name; private String type; - private String className; private String description; - private List<Map<String, String>> fields; - public String getType() { - return type; + public String getName() { + return name; } - public void setType(String type) { - this.type = type; + public void setName(String name) { + this.name = name; } - public String getClassName() { - return className; + public String getType() { + return type; } - public void setClassName(String className) { - this.className = className; + public void setType(String type) { + this.type = type; } public String getDescription() { @@ -70,7 +68,7 @@ public class PublishmentType { public boolean equals(Object obj) { if (obj instanceof PublishmentType) { PublishmentType p = (PublishmentType) obj; - return (Objects.equals(className, p.getClassName()) + return (Objects.equals(name, p.name) && Objects.equals(type, p.type) && Objects.equals(description, p.getDescription()) && Objects.equals(fields, p.getFields())); @@ -81,7 +79,7 @@ public class PublishmentType { @Override public int hashCode() { return new HashCodeBuilder() - .append(className) + .append(name) .append(type) .append(description) .append(fields) http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6a7842fc/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java index 957ac9a..fcd856a 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java @@ -24,13 +24,13 @@ public class PublishmentTypeTest { @Test public void testPublishmentType() { PublishmentType publishmentType = new PublishmentType(); - publishmentType.setType("KAFKA"); - publishmentType.setClassName("setClassName"); + publishmentType.setName("KAFKA"); + publishmentType.setType("setClassName"); publishmentType.setDescription("setDescription"); PublishmentType publishmentType1 = new PublishmentType(); - publishmentType1.setType("KAFKA"); - publishmentType1.setClassName("setClassName"); + publishmentType1.setName("KAFKA"); + publishmentType1.setType("setClassName"); publishmentType1.setDescription("setDescription"); Assert.assertFalse(publishmentType.equals(new String(""))); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6a7842fc/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java index c814252..7bfd2c3 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java @@ -368,10 +368,10 @@ public class MetadataResource { return results; } - @Path("/publishmentTypes/{pubType}") + @Path("/publishmentTypes/{name}") @DELETE - public OpResult removePublishmentType(@PathParam("pubType") String pubType) { - return dao.removePublishmentType(pubType); + public OpResult removePublishmentType(@PathParam("name") String name) { + return dao.removePublishmentType(name); } @Path("/publishmentTypes") http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6a7842fc/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java index be22280..658703a 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java @@ -50,9 +50,6 @@ public class MetadataUtils { if (t instanceof StreamDefinition) { return ((StreamDefinition) t).getStreamId(); } - if (t instanceof PublishmentType) { - return ((PublishmentType) t).getType(); - } if (t instanceof PolicyAssignment) { return ((PolicyAssignment) t).getPolicyName(); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6a7842fc/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java index b522451..e0b5c9d 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java @@ -236,8 +236,8 @@ public class JdbcMetadataDaoImpl implements IMetadataDao { } @Override - public OpResult removePublishmentType(String pubType) { - return handler.removeById(PublishmentType.class.getSimpleName(), pubType); + public OpResult removePublishmentType(String name) { + return handler.removeById(PublishmentType.class.getSimpleName(), name); } @Override http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6a7842fc/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java index fca5be6..e747d23 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java @@ -155,14 +155,8 @@ public class MongoMetadataDaoImpl implements IMetadataDao { publishment.createIndex(doc, io); topologies = db.getCollection("topologies"); topologies.createIndex(doc, io); - publishmentType = db.getCollection("publishmentTypes"); - { - IndexOptions io1 = new IndexOptions().background(true).unique(true).name("pubTypeIndex"); - BsonDocument doc1 = new BsonDocument(); - doc1.append("type", new BsonInt32(1)); - publishmentType.createIndex(doc1, io1); - } + publishmentType.createIndex(doc, io); alerts = db.getCollection("alerts"); { @@ -225,8 +219,6 @@ public class MongoMetadataDaoImpl implements IMetadataDao { BsonDocument filter = new BsonDocument(); if (t instanceof StreamDefinition) { filter.append("streamId", new BsonString(MetadataUtils.getKey(t))); - } else if (t instanceof PublishmentType) { - filter.append("type", new BsonString(MetadataUtils.getKey(t))); } else if (t instanceof AlertPublishEvent) { filter.append("alertId", new BsonString(MetadataUtils.getKey(t))); } else { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6a7842fc/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java index a2c1451..d718632 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java @@ -134,8 +134,8 @@ public class JdbcImplTest { // publishmentType { PublishmentType publishmentType = new PublishmentType(); - publishmentType.setType("KAFKA"); - publishmentType.setClassName("org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher"); + publishmentType.setName("KAFKA"); + publishmentType.setType("org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher"); List<Map<String, String>> fields = new ArrayList<>(); Map<String, String> field1 = new HashMap<>(); field1.put("name", "kafka_broker"); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6a7842fc/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/MongoImplTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/MongoImplTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/MongoImplTest.java index 3b3ddf9..cd9a0a9 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/MongoImplTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/MongoImplTest.java @@ -167,7 +167,7 @@ public class MongoImplTest { // publishmentType { PublishmentType publishmentType = new PublishmentType(); - publishmentType.setType("KAFKA"); + publishmentType.setName("KAFKA"); OpResult result = dao.addPublishmentType(publishmentType); Assert.assertEquals(200, result.code); List<PublishmentType> assigns = dao.listPublishmentType(); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6a7842fc/eagle-server-assembly/src/main/bin/createTables.sql ---------------------------------------------------------------------- diff --git a/eagle-server-assembly/src/main/bin/createTables.sql b/eagle-server-assembly/src/main/bin/createTables.sql index b6e635a..f80a7ea 100644 --- a/eagle-server-assembly/src/main/bin/createTables.sql +++ b/eagle-server-assembly/src/main/bin/createTables.sql @@ -131,7 +131,7 @@ CREATE TABLE IF NOT EXISTS alert_event ( ); INSERT INTO publishment_type(id, content) VALUES -('Kafka', '{"type":"Kafka","className":"org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher","description":null,"fields":[{"name":"kafka_broker","value":"sandbox.hortonworks.com:6667"},{"name":"topic"}]}'), -('Email', '{"type":"Email","className":"org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher","description":null,"fields":[{"name":"subject"},{"name":"sender"}, {"name":"recipients"}]}'), -('Slack', '{"type":"Slack","className":"org.apache.eagle.alert.engine.publisher.impl.AlertSlackPublisher","description":null,"fields":[{"name":"token"},{"name":"channels"}, {"name":"severitys"}, {"name":"urltemplate"}]}'), -('Storage', '{"type":"Storage","className":"org.apache.eagle.alert.app.AlertEagleStorePlugin","description":null,"fields":[]}'); +('Kafka', '{"name":"Kafka","type":"org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher","description":null,"fields":[{"name":"kafka_broker","value":"sandbox.hortonworks.com:6667"},{"name":"topic"}]}'), +('Email', '{"name":"Email","type":"org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher","description":null,"fields":[{"name":"subject"},{"name":"sender"}, {"name":"recipients"}]}'), +('Slack', '{"name":"Slack","type":"org.apache.eagle.alert.engine.publisher.impl.AlertSlackPublisher","description":null,"fields":[{"name":"token"},{"name":"channels"}, {"name":"severitys"}, {"name":"urltemplate"}]}'), +('Storage', '{"name":"Storage","type":"org.apache.eagle.alert.app.AlertEagleStorePlugin","description":null,"fields":[]}'); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6a7842fc/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java index 99a44a6..b11394c 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java @@ -44,12 +44,12 @@ public class IPMaskTopologyRackResolver implements TopologyRackResolver { @Override public String resolve(String hostname) { - String result = null; + String result = "null"; try { InetAddress address = InetAddress.getByName(hostname); result = "rack" + (int) (address.getAddress()[rackPos] & 0xff); } catch (UnknownHostException e) { - LOG.error(e.getMessage(), e); + //LOG.warn("UnknownHostException: {}", hostname); } return result; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6a7842fc/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java index 627ebe3..1e7acb8 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java @@ -87,7 +87,7 @@ public class TopologyDataPersistBolt extends BaseRichBolt { GenericServiceAPIResponseEntity<TopologyBaseAPIEntity> response = client.search().query(query).pageSize(Integer.MAX_VALUE).send(); if (response.isSuccess() && response.getObj() != null) { for (TopologyBaseAPIEntity entity : response.getObj()) { - if (result.getSlaveNodes().size() > 0 && !availableHostnames.contains(generateKey(entity))) { + if (!availableHostnames.isEmpty() && !availableHostnames.contains(generateKey(entity))) { entitiesForDeletion.add(entity); } } @@ -103,13 +103,17 @@ public class TopologyDataPersistBolt extends BaseRichBolt { } private void filterEntitiesToWrite(TopologyEntityParserResult result, Set<String> availableHostnames, List<TopologyBaseAPIEntity> entitiesToWrite) { - for (TopologyBaseAPIEntity entity : result.getMasterNodes()) { - availableHostnames.add(generateKey(entity)); - entitiesToWrite.add(entity); - } - for (TopologyBaseAPIEntity entity : result.getSlaveNodes()) { - availableHostnames.add(generateKey(entity)); - entitiesToWrite.add(entity); + if (!result.getSlaveNodes().isEmpty()) { + for (TopologyBaseAPIEntity entity : result.getMasterNodes()) { + availableHostnames.add(generateKey(entity)); + entitiesToWrite.add(entity); + } + for (TopologyBaseAPIEntity entity : result.getSlaveNodes()) { + availableHostnames.add(generateKey(entity)); + entitiesToWrite.add(entity); + } + } else { + LOG.warn("Data is in an inconsistent state"); } }