Repository: eagle Updated Branches: refs/heads/master dd16620e2 -> d5dce2b34
[EAGLE-950] Add REST Stream Proxy for easy integration https://issues.apache.org/jira/browse/EAGLE-950 Add API for `POST /streams/{STREAM_ID}` to populate real-time data into stream through REST API, so that monitoring cases could easily integrate with eagle by some native tools like `curl` or scripts For example: * Step 1. Support service team to collect metric with SINGLE COMMAND in Ad-Hoc monitoring scripts like `curl` curl --basic -u $USERNAME:$PASSWORD \ -H "Content-Type: application/json" -X POST \ http://localhot:9090/rest/streams/HADOOP_JMX_METRIC_STREAM_SANDBOX \ -d '[ { "timestamp": 1489318310626, "metric": "hadoop.metric.sample", "component": "namenode", "site": "sandbox", "value": 55044096.0, "host": "sandbox" },{ "timestamp": 1489318320626, "metric": "hadoop.metric.sample", "component": "namenode", "site": "sandbox", "value": 55044096.0, "host": "sandbox" },{ "timestamp": 1489318330626, "metric": "hadoop.metric.sample", "component": "namenode", "site": "sandbox", "value": 55044096.0, "host": "sandbox" } ]' * Step 2: Define Alerting Policies * Step 3: Customized Metric Visualization Author: Hao Chen <[email protected]> Closes #865 from haoch/ImproveMetricMonitor. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/d5dce2b3 Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/d5dce2b3 Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/d5dce2b3 Branch: refs/heads/master Commit: d5dce2b3498960417da498895e3f2d9c4241f66b Parents: dd16620 Author: Hao Chen <[email protected]> Authored: Sun Mar 12 20:01:15 2017 +0800 Committer: Hao Chen <[email protected]> Committed: Sun Mar 12 20:01:15 2017 +0800 ---------------------------------------------------------------------- .../alert/engine/coordinator/StreamColumn.java | 2 +- .../engine/coordinator/StreamDefinition.java | 5 +- .../alert/utils/StreamValidationException.java | 39 ++++++ .../eagle/alert/utils/StreamValidator.java | 57 ++++++++ .../coordinator/StreamDefinitionTest.java | 6 +- .../alert/engine/model/StreamEventTest.java | 13 ++ .../eagle/app/messaging/StreamRecord.java | 30 +++++ .../apache/eagle/app/test/KafkaTestServer.java | 34 +++++ .../eagle/app/test/KafkaTestServerImpl.java | 79 ++++++++++++ .../eagle-app/eagle-app-streamproxy/pom.xml | 39 ++++++ .../proxy/exception/StreamProxyException.java | 39 ++++++ .../stream/StreamConfigUpdateListener.java | 36 ++++++ .../stream/StreamMetadataUpdateService.java | 27 ++++ .../eagle/app/proxy/stream/StreamProxy.java | 27 ++++ .../app/proxy/stream/StreamProxyManager.java | 31 +++++ .../app/proxy/stream/StreamProxyProducer.java | 28 ++++ .../app/proxy/stream/StreamProxyResource.java | 85 ++++++++++++ .../impl/KafkaStreamProxyProducerImpl.java | 88 +++++++++++++ .../impl/StreamMetadataUpdateServiceImpl.java | 115 +++++++++++++++++ .../app/proxy/stream/impl/StreamProxyImpl.java | 77 +++++++++++ .../stream/impl/StreamProxyManagerImpl.java | 129 +++++++++++++++++++ .../eagle/app/proxy/stream/StreamProxyTest.java | 76 +++++++++++ .../src/test/resources/application.conf | 35 +++++ eagle-core/eagle-app/pom.xml | 1 + .../apache/eagle/common/rest/RESTResponse.java | 3 +- .../eagle/metric/HadoopMetricMonitorApp.java | 4 +- ...le.metric.HadoopMetricMonitorAppProdiver.xml | 6 +- eagle-server/pom.xml | 11 ++ .../server/security/BasicAuthRequestFilter.java | 14 +- eagle-server/src/main/webapp/app/dev/index.html | 40 +++--- .../webapp/app/dev/partials/alert/list.html | 2 +- .../webapp/app/dev/public/js/ctrls/alertCtrl.js | 6 +- .../webapp/app/dev/public/js/ctrls/mainCtrl.js | 2 +- 33 files changed, 1144 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java index ba736fe..abd9dc5 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java @@ -34,7 +34,7 @@ public class StreamColumn implements Serializable { private String name; private Type type; private Object defaultValue; - private boolean required; + private boolean required = true; private String description; private String nodataExpression; http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java index 73d3991..af9d137 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java @@ -42,7 +42,7 @@ public class StreamDefinition implements Serializable { private String description; // Is validateable or not - private boolean validate; + private boolean validate = true; // Is timeseries-based stream or not private boolean timeseries; @@ -52,7 +52,7 @@ public class StreamDefinition implements Serializable { // Stream data source ID private String dataSource; - private String group = "Default"; + private String group = "global"; // private String streamSource; @@ -125,6 +125,7 @@ public class StreamDefinition implements Serializable { this.description = description; } + @Deprecated public boolean isValidate() { return validate; } http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamValidationException.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamValidationException.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamValidationException.java new file mode 100644 index 0000000..2f08506 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamValidationException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.utils; + +public class StreamValidationException extends Exception { + public StreamValidationException() { + super(); + } + + public StreamValidationException(String message) { + super(message); + } + + public StreamValidationException(String message, Throwable cause) { + super(message, cause); + } + + public StreamValidationException(Throwable cause) { + super(cause); + } + + protected StreamValidationException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamValidator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamValidator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamValidator.java new file mode 100644 index 0000000..a1f64d9 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamValidator.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.utils; + +import org.apache.commons.lang3.StringUtils; +import org.apache.eagle.alert.engine.coordinator.StreamColumn; +import org.apache.eagle.alert.engine.coordinator.StreamDefinition; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +public class StreamValidator { + private final StreamDefinition streamDefinition; + private final Map<String, StreamColumn> streamColumnMap; + + public StreamValidator(StreamDefinition streamDefinition) { + this.streamDefinition = streamDefinition; + this.streamColumnMap = new HashMap<>(); + for (StreamColumn column : this.streamDefinition.getColumns()) { + streamColumnMap.put(column.getName(), column); + } + } + + public void validateMap(Map<String, Object> event) throws StreamValidationException { + final List<String> errors = new LinkedList<>(); + this.streamDefinition.getColumns().forEach((column -> { + if (column.isRequired() && !event.containsKey(column.getName())) { + errors.add("[" + column.getName() + "]: required but absent"); + } + })); + for (Object eventKey : event.keySet()) { + if (!streamColumnMap.containsKey(eventKey)) { + errors.add("[" + eventKey + "]: invalid column"); + } + } + + if (errors.size() > 0) { + throw new StreamValidationException(errors.size() + " validation errors: " + StringUtils.join(errors.toArray(), "; ")); + } + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java index c0d9213..85ef5dc 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java @@ -35,15 +35,15 @@ public class StreamDefinitionTest { streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build()); StreamDefinition streamDefinition = new StreamDefinition(); - Assert.assertEquals("StreamDefinition[group=Default, streamId=null, dataSource=null, description=null, validate=false, timeseries=false, columns=[]", streamDefinition.toString()); + Assert.assertEquals("StreamDefinition[group=global, streamId=null, dataSource=null, description=null, validate=true, timeseries=false, columns=[]", streamDefinition.toString()); streamDefinition.setColumns(streamColumns); Assert.assertEquals(3, streamDefinition.getColumnIndex("data")); Assert.assertEquals(-1, streamDefinition.getColumnIndex("DATA")); Assert.assertEquals(-1, streamDefinition.getColumnIndex("isYhd")); - Assert.assertEquals("StreamDefinition[group=Default, streamId=null, dataSource=null, description=null, validate=false, timeseries=false, columns=[StreamColumn=name[name], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[host], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[flag], type=[bool], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[data], type=[long], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[value], type=[double], defaultValue=[null], required=[false], nodataExpression=[null]]", streamDefinition.toString()); + Assert.assertEquals("StreamDefinition[group=global, streamId=null, dataSource=null, description=null, validate=true, timeseries=false, columns=[StreamColumn=name[name], type=[string], defaultValue=[null], required=[true], nodataExpression=[null], StreamColumn=name[host], type=[string], defaultValue=[null], required=[true], nodataExpression=[null], StreamColumn=name[flag], type=[bool], defaultValue=[null], required=[true], nodataExpression=[null], StreamColumn=name[data], type=[long], defaultValue=[null], required=[true], nodataExpression=[null], StreamColumn=name[value], type=[double], defaultValue=[null], required=[true], nodataExpression=[null]]", streamDefinition.toString()); StreamDefinition streamDefinition1 = streamDefinition.copy(); - Assert.assertEquals("StreamDefinition[group=Default, streamId=null, dataSource=null, description=null, validate=false, timeseries=false, columns=[StreamColumn=name[name], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[host], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[flag], type=[bool], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[data], type=[long], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[value], type=[double], defaultValue=[null], required=[false], nodataExpression=[null]]", streamDefinition1.toString()); + Assert.assertEquals("StreamDefinition[group=global, streamId=null, dataSource=null, description=null, validate=true, timeseries=false, columns=[StreamColumn=name[name], type=[string], defaultValue=[null], required=[true], nodataExpression=[null], StreamColumn=name[host], type=[string], defaultValue=[null], required=[true], nodataExpression=[null], StreamColumn=name[flag], type=[bool], defaultValue=[null], required=[true], nodataExpression=[null], StreamColumn=name[data], type=[long], defaultValue=[null], required=[true], nodataExpression=[null], StreamColumn=name[value], type=[double], defaultValue=[null], required=[true], nodataExpression=[null]]", streamDefinition1.toString()); Assert.assertTrue(streamDefinition1.equals(streamDefinition)); Assert.assertFalse(streamDefinition1 == streamDefinition); http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/StreamEventTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/StreamEventTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/StreamEventTest.java index 6543a8d..547ef75 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/StreamEventTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/StreamEventTest.java @@ -18,6 +18,8 @@ package org.apache.eagle.alert.engine.model; import org.apache.eagle.alert.engine.coordinator.StreamColumn; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; +import org.apache.eagle.alert.utils.StreamValidationException; +import org.apache.eagle.alert.utils.StreamValidator; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -130,6 +132,17 @@ public class StreamEventTest { } @Test + public void testStreamValidator() throws StreamValidationException { + StreamDefinition streamDefinition = mockStreamDefinition("TEST_STREAM"); + StreamValidator validator = new StreamValidator(streamDefinition); + thrown.expect(StreamValidationException.class); + validator.validateMap(new HashMap<String, Object>() {{ + put("name", "cpu"); + put("value", 60.0); + }}); + } + + @Test public void testStreamEvent3() { List<StreamColumn> streamColumns = new ArrayList<>(); streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build()); http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamRecord.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamRecord.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamRecord.java new file mode 100644 index 0000000..f76cdbf --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/StreamRecord.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.messaging; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +public class StreamRecord extends HashMap<String,Object> implements Serializable { + public StreamRecord() { + } + + public StreamRecord(Map<String,Object> event) { + this.putAll(event); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/KafkaTestServer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/KafkaTestServer.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/KafkaTestServer.java new file mode 100644 index 0000000..9744350 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/KafkaTestServer.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.test; + +import java.io.File; +import java.io.IOException; + +public interface KafkaTestServer { + void start() throws Exception; + + void stop() throws IOException; + + int getZookeeperPort(); + + int getKafkaBrokerPort(); + + static KafkaTestServer create(File logDir) { + return new KafkaTestServerImpl(logDir); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/KafkaTestServerImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/KafkaTestServerImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/KafkaTestServerImpl.java new file mode 100644 index 0000000..30ce67c --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/KafkaTestServerImpl.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.test; + +import kafka.server.KafkaConfig; +import kafka.server.KafkaServerStartable; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.test.InstanceSpec; +import org.apache.curator.test.TestingServer; + +import java.io.File; +import java.io.IOException; +import java.util.Properties; + +class KafkaTestServerImpl implements KafkaTestServer { + + private final File logDir; + private TestingServer zkServer; + private CuratorFramework curatorClient; + private KafkaServerStartable kafkaServer; + private int kafkaPort = InstanceSpec.getRandomPort(); + private int zookeeperPort = InstanceSpec.getRandomPort(); + + public KafkaTestServerImpl(File logDir) { + this.logDir = logDir; + } + + @Override + public void start() throws Exception { + zkServer = new TestingServer(zookeeperPort, logDir); + ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); + curatorClient = CuratorFrameworkFactory.newClient(zkServer.getConnectString(), retryPolicy); + curatorClient.start(); + + Properties props = new Properties(); + + props.setProperty("zookeeper.connect", zkServer.getConnectString()); + props.setProperty("broker.id", "0"); + props.setProperty("port", "" + kafkaPort); + props.setProperty("log.dirs", logDir.getAbsolutePath()); + props.setProperty("auto.create.topics.enable", "true"); + + kafkaServer = new KafkaServerStartable(new KafkaConfig(props)); + kafkaServer.startup(); + } + + @Override + public void stop() throws IOException { + kafkaServer.shutdown(); + curatorClient.close(); + zkServer.close(); + } + + @Override + public int getZookeeperPort() { + return this.zookeeperPort; + } + + @Override + public int getKafkaBrokerPort() { + return this.kafkaPort; + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-streamproxy/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/pom.xml b/eagle-core/eagle-app/eagle-app-streamproxy/pom.xml new file mode 100644 index 0000000..8672d7a --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-streamproxy/pom.xml @@ -0,0 +1,39 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>eagle-app-parent</artifactId> + <groupId>org.apache.eagle</groupId> + <version>0.5.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>eagle-app-streamproxy</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-app-base</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/exception/StreamProxyException.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/exception/StreamProxyException.java b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/exception/StreamProxyException.java new file mode 100644 index 0000000..094547d --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/exception/StreamProxyException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.proxy.exception; + +public class StreamProxyException extends Exception { + public StreamProxyException() { + super(); + } + + public StreamProxyException(String message) { + super(message); + } + + public StreamProxyException(String message, Throwable cause) { + super(message, cause); + } + + public StreamProxyException(Throwable cause) { + super(cause); + } + + protected StreamProxyException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamConfigUpdateListener.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamConfigUpdateListener.java b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamConfigUpdateListener.java new file mode 100644 index 0000000..524569e --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamConfigUpdateListener.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.proxy.stream; + +import org.apache.eagle.metadata.model.StreamDesc; + +public interface StreamConfigUpdateListener { + /** + * onStreamAdded listener callback method. + */ + void onStreamAdded(StreamDesc streamDesc); + + /** + * onStreamChanged listener callback method. + */ + void onStreamChanged(StreamDesc streamDesc); + + /** + * onStreamRemoved listener callback method. + */ + void onStreamRemoved(StreamDesc streamDesc); +} http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamMetadataUpdateService.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamMetadataUpdateService.java b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamMetadataUpdateService.java new file mode 100644 index 0000000..8ba0978 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamMetadataUpdateService.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.proxy.stream; + +import org.apache.eagle.metadata.model.StreamDesc; + +import java.util.Map; + +public interface StreamMetadataUpdateService extends Runnable { + Map<String, StreamDesc> getStreamDescSnapshot(); + + void shutdown(); +} http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxy.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxy.java b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxy.java new file mode 100644 index 0000000..275fd59 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxy.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.proxy.stream; + +import org.apache.eagle.metadata.model.StreamDesc; + +import java.io.IOException; + +public interface StreamProxy extends StreamProxyProducer { + void open(StreamDesc streamDesc); + + void close() throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxyManager.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxyManager.java b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxyManager.java new file mode 100644 index 0000000..5ee4765 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxyManager.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.proxy.stream; + +import com.google.inject.ImplementedBy; +import org.apache.eagle.app.proxy.exception.StreamProxyException; +import org.apache.eagle.app.proxy.stream.impl.StreamProxyManagerImpl; +import org.apache.eagle.metadata.model.StreamDesc; + +import java.util.Collection; + +@ImplementedBy(StreamProxyManagerImpl.class) +public interface StreamProxyManager { + StreamProxy getStreamProxy(String streamId) throws StreamProxyException; + + Collection<StreamDesc> getAllStreamDesc(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxyProducer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxyProducer.java b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxyProducer.java new file mode 100644 index 0000000..479b8d4 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxyProducer.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.proxy.stream; + +import org.apache.eagle.app.messaging.StreamRecord; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.Future; + +public interface StreamProxyProducer extends Closeable { + void send(List<StreamRecord> events) throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxyResource.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxyResource.java b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxyResource.java new file mode 100644 index 0000000..1d5e111 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/StreamProxyResource.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.proxy.stream; + +import com.google.common.base.Preconditions; +import com.google.inject.Inject; +import org.apache.eagle.app.messaging.StreamRecord; +import org.apache.eagle.common.rest.RESTResponse; +import org.apache.eagle.common.security.RolesAllowed; +import org.apache.eagle.common.security.User; +import org.apache.eagle.metadata.model.StreamDesc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.validation.constraints.NotNull; +import javax.ws.rs.*; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.util.Collection; +import java.util.List; +import java.util.Optional; + +@Path("/streams") +public class StreamProxyResource { + private static final Logger LOGGER = LoggerFactory.getLogger(StreamProxyResource.class); + @Inject + private StreamProxyManager proxyManager; + + @GET + @Produces(MediaType.APPLICATION_JSON) + public RESTResponse<Collection<StreamDesc>> getAllStreamDesc() { + return RESTResponse.async(() -> proxyManager.getAllStreamDesc()).get(); + } + + @POST + @Path("/{streamId}") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @RolesAllowed( {User.Role.ADMINISTRATOR, User.Role.APPLICATION}) + public RESTResponse produceEvent(@NotNull List<StreamRecord> records, @PathParam("streamId") String streamId) { + return RESTResponse.async((builder) -> { + try { + Preconditions.checkNotNull(records, "Records is empty"); + proxyManager.getStreamProxy(streamId).send(records); + builder.status(true, Response.Status.OK) + .message(String.format("Successfully wrote %s records into stream %s", records.size(), streamId)); + } catch (Exception e) { + LOGGER.error("Error to write records to stream {}: {}", streamId, e.getMessage(), e); + builder.exception(e) + .status(false, Response.Status.BAD_REQUEST) + .message("Failed to write messages to stream " + streamId + ": " + e.getMessage()); + } + }).get(); + } + + @GET + @Path("/{streamId}") + @Produces(MediaType.APPLICATION_JSON) + public RESTResponse getSingleStreamDesc(@PathParam("streamId") String streamId) { + return RESTResponse.async((builder) -> { + Optional<StreamDesc> streamDesc = proxyManager.getAllStreamDesc() + .stream().filter((desc) -> desc.getStreamId().equalsIgnoreCase(streamId)).findAny(); + if (streamDesc.isPresent()) { + builder.data(streamDesc.get()) + .status(true, Response.Status.OK); + } else { + builder.message("Stream not found, reason: stream not exist or proxy not initialized").status(false, Response.Status.BAD_REQUEST); + } + }).get(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/KafkaStreamProxyProducerImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/KafkaStreamProxyProducerImpl.java b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/KafkaStreamProxyProducerImpl.java new file mode 100644 index 0000000..4b2b333 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/KafkaStreamProxyProducerImpl.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.proxy.stream.impl; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; +import org.apache.eagle.alert.utils.StreamValidator; +import org.apache.eagle.app.messaging.KafkaStreamSinkConfig; +import org.apache.eagle.app.messaging.StreamRecord; +import org.apache.eagle.app.proxy.stream.StreamProxyProducer; +import org.apache.eagle.metadata.model.StreamSinkConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; + +public class KafkaStreamProxyProducerImpl implements StreamProxyProducer { + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamProxyProducerImpl.class); + private final Producer producer; + private final KafkaStreamSinkConfig config; + private final String streamId; + + public KafkaStreamProxyProducerImpl(String streamId, StreamSinkConfig streamConfig) { + Preconditions.checkNotNull(streamConfig, "Stream sink config for " + streamId + " is null"); + this.streamId = streamId; + this.config = (KafkaStreamSinkConfig) streamConfig; + Properties properties = new Properties(); + Preconditions.checkNotNull(config.getBrokerList(), "brokerList is null"); + properties.put("metadata.broker.list", config.getBrokerList()); + properties.put("serializer.class", config.getSerializerClass()); + properties.put("key.serializer.class", config.getKeySerializerClass()); + // new added properties for async producer + properties.put("producer.type", config.getProducerType()); + properties.put("batch.num.messages", config.getNumBatchMessages()); + properties.put("request.required.acks", config.getRequestRequiredAcks()); + properties.put("queue.buffering.max.ms", config.getMaxQueueBufferMs()); + ProducerConfig producerConfig = new ProducerConfig(properties); + this.producer = new Producer(producerConfig); + } + + @Override + public void send(List<StreamRecord> events) throws IOException { + List<KeyedMessage> messages = new ArrayList<>(events.size()); + + for (StreamRecord record : events) { + String output = new ObjectMapper().writeValueAsString(record); + messages.add(new KeyedMessage(this.config.getTopicId(), output)); + } + + try { + // partition key may cause data skew + //producer.send(new KeyedMessage(this.topicId, key, output)); + producer.send(messages); + } catch (Exception ex) { + LOGGER.error(ex.getMessage(), ex); + throw ex; + } + } + + @Override + public void close() throws IOException { + if (this.producer != null) { + LOGGER.info("Closing kafka producer for stream {}", this.streamId); + this.producer.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamMetadataUpdateServiceImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamMetadataUpdateServiceImpl.java b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamMetadataUpdateServiceImpl.java new file mode 100644 index 0000000..10bad33 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamMetadataUpdateServiceImpl.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.proxy.stream.impl; + +import org.apache.eagle.app.proxy.stream.StreamConfigUpdateListener; +import org.apache.eagle.app.proxy.stream.StreamMetadataUpdateService; +import org.apache.eagle.metadata.model.ApplicationEntity; +import org.apache.eagle.metadata.model.StreamDesc; +import org.apache.eagle.metadata.service.ApplicationEntityService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +class StreamMetadataUpdateServiceImpl implements StreamMetadataUpdateService { + private static final Logger LOGGER = LoggerFactory.getLogger(StreamMetadataUpdateServiceImpl.class); + + private final ApplicationEntityService applicationEntityService; + private final StreamConfigUpdateListener listener; + private Map<String, StreamDesc> streamIdDescMap; + + private long startedTime = -1; + private long lastUpdatedTime = -1; + private static final long UPDATE_INTERVAL_MS = 10 * 1000; + private volatile boolean running; + private final Object lock; + + StreamMetadataUpdateServiceImpl(StreamConfigUpdateListener listener, ApplicationEntityService applicationEntityService) { + this.applicationEntityService = applicationEntityService; + this.lock = new Object(); + this.listener = listener; + this.streamIdDescMap = new HashMap<>(); + } + + @Override + public void run() { + this.startedTime = System.currentTimeMillis(); + this.running = true; + while (this.running) { + try { + updateStreamMetadata(); + Thread.sleep(UPDATE_INTERVAL_MS); + } catch (InterruptedException e) { + LOGGER.error(e.getMessage(), e); + } + } + LOGGER.info("Shutting down"); + } + + private void updateStreamMetadata() { + synchronized (lock) { + LOGGER.debug("Loading stream metadata ..."); + int added = 0; + int changed = 0; + int removed = 0; + int total = 0; + Map<String, StreamDesc> latestStreamIdDescMap = new HashMap<>(); + for (ApplicationEntity appEntity : applicationEntityService.findAll()) { + List<StreamDesc> streamDescList = appEntity.getStreams(); + if (streamDescList != null && streamDescList.size() > 0) { + for (StreamDesc streamDesc : streamDescList) { + latestStreamIdDescMap.put(streamDesc.getStreamId(), streamDesc); + if (streamIdDescMap.containsKey(streamDesc.getStreamId()) && !Objects.equals(streamDesc, streamIdDescMap.get(streamDesc.getStreamId()))) { + this.listener.onStreamChanged(streamDesc); + changed++; + } else if (!streamIdDescMap.containsKey(streamDesc.getStreamId())) { + added++; + this.listener.onStreamAdded(streamDesc); + } + } + } + total++; + } + + for (String streamId : streamIdDescMap.keySet()) { + if (!latestStreamIdDescMap.containsKey(streamId)) { + removed++; + this.listener.onStreamRemoved(streamIdDescMap.get(streamId)); + } + } + this.streamIdDescMap = latestStreamIdDescMap; + if (added > 0 || changed > 0 || removed > 0) { + LOGGER.info("Loaded stream metadata: total = {}, added = {}, changed = {}, removed = {}", total, added, changed, removed); + } else { + LOGGER.debug("Loaded stream metadata: total = {}, added = {}, changed = {}, removed = {}", total, added, changed, removed); + } + } + } + + @Override + public Map<String, StreamDesc> getStreamDescSnapshot() { + return streamIdDescMap; + } + + public void shutdown() { + this.running = false; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamProxyImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamProxyImpl.java b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamProxyImpl.java new file mode 100644 index 0000000..a1d0b5e --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamProxyImpl.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.proxy.stream.impl; + +import com.google.common.base.Preconditions; +import org.apache.eagle.alert.utils.StreamValidationException; +import org.apache.eagle.alert.utils.StreamValidator; +import org.apache.eagle.app.messaging.StreamRecord; +import org.apache.eagle.app.proxy.stream.StreamProxy; +import org.apache.eagle.app.proxy.stream.StreamProxyProducer; +import org.apache.eagle.metadata.model.StreamDesc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +public class StreamProxyImpl implements StreamProxy { + private static final Logger LOGGER = LoggerFactory.getLogger(StreamProxyImpl.class); + private StreamProxyProducer proxyProducer; + private volatile boolean opened; + private String streamId; + private StreamValidator validator; + + @Override + public void open(StreamDesc streamDesc) { + if (streamDesc.getSchema() != null) { + this.validator = new StreamValidator(streamDesc.getSchema()); + } + this.streamId = streamDesc.getStreamId(); + if (streamDesc.getSinkConfig() != null) { + this.proxyProducer = new KafkaStreamProxyProducerImpl(streamDesc.getStreamId(), streamDesc.getSinkConfig()); + } else { + LOGGER.warn("Unable to initialize kafka producer because sink config is null for {}", streamId); + this.proxyProducer = null; + } + this.opened = true; + } + + @Override + public void close() throws IOException { + if (this.proxyProducer != null) { + this.proxyProducer.close(); + } + this.opened = false; + } + + @Override + public void send(List<StreamRecord> events) throws IOException { + Preconditions.checkArgument(this.opened, "Stream proxy not opened for " + streamId); + Preconditions.checkNotNull(this.proxyProducer, "Stream proxy producer not initialized for " + streamId); + if (this.validator != null) { + for (StreamRecord event : events) { + try { + this.validator.validateMap(event); + } catch (StreamValidationException e) { + throw new IOException(e); + } + } + } + this.proxyProducer.send(events); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamProxyManagerImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamProxyManagerImpl.java b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamProxyManagerImpl.java new file mode 100644 index 0000000..a79916c --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamProxyManagerImpl.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.proxy.stream.impl; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.eagle.app.proxy.stream.StreamConfigUpdateListener; +import org.apache.eagle.app.proxy.stream.StreamMetadataUpdateService; +import org.apache.eagle.app.proxy.stream.StreamProxy; +import org.apache.eagle.app.proxy.stream.StreamProxyManager; +import org.apache.eagle.app.proxy.exception.StreamProxyException; +import org.apache.eagle.metadata.model.StreamDesc; +import org.apache.eagle.metadata.service.ApplicationEntityService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +@Singleton +public class StreamProxyManagerImpl implements StreamProxyManager, StreamConfigUpdateListener { + private static final Logger LOGGER = LoggerFactory.getLogger(StreamProxyManagerImpl.class); + private final StreamMetadataUpdateService streamMetadataUpdateService; + private final ConcurrentMap<String, StreamProxy> streamProxyConcurrentMap; + + @Inject + public StreamProxyManagerImpl(ApplicationEntityService applicationEntityService) { + LOGGER.info("Initializing StreamProxyManager {}", this); + this.streamMetadataUpdateService = new StreamMetadataUpdateServiceImpl(this, applicationEntityService); + this.streamProxyConcurrentMap = new ConcurrentHashMap<>(); + Thread streamMetadataUpdateServiceThread = new Thread(this.streamMetadataUpdateService); + streamMetadataUpdateServiceThread.setDaemon(true); + streamMetadataUpdateServiceThread.setName(StreamMetadataUpdateService.class.getName()); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + streamMetadataUpdateService.shutdown(); + for (StreamProxy proxy : streamProxyConcurrentMap.values()) { + try { + proxy.close(); + } catch (IOException e) { + LOGGER.error("Error to close {}: {}", proxy, e.getMessage(), e); + } + } + } + }); + streamMetadataUpdateServiceThread.start(); + } + + @Override + public StreamProxy getStreamProxy(String streamId) throws StreamProxyException { + if (!streamMetadataUpdateService.getStreamDescSnapshot().containsKey(streamId)) { + throw new StreamProxyException("Stream ID: " + streamId + " not found"); + } + if (!streamProxyConcurrentMap.containsKey(streamId)) { + throw new StreamProxyException("Not stream proxy instance initialized for " + streamId); + } + return streamProxyConcurrentMap.get(streamId); + } + + @Override + public Collection<StreamDesc> getAllStreamDesc() { + return streamMetadataUpdateService.getStreamDescSnapshot().values(); + } + + @Override + public void onStreamAdded(StreamDesc streamDesc) { + if (streamProxyConcurrentMap.containsKey(streamDesc.getStreamId())) { + LOGGER.warn("Adding already existing stream proxy {}", streamDesc.getStreamId()); + this.onStreamChanged(streamDesc); + return; + } + LOGGER.info("Adding stream proxy {}", streamDesc.getStreamId()); + StreamProxy proxy = new StreamProxyImpl(); + proxy.open(streamDesc); + streamProxyConcurrentMap.put(streamDesc.getStreamId(), proxy); + } + + @Override + public void onStreamChanged(StreamDesc streamDesc) { + if (!streamProxyConcurrentMap.containsKey(streamDesc.getStreamId())) { + LOGGER.warn("Updating non-existing stream proxy {}", streamDesc.getStreamId()); + this.onStreamAdded(streamDesc); + return; + } + LOGGER.info("Updating stream proxy {}", streamDesc.getStreamId()); + try { + LOGGER.info("Closing old stream proxy {}", streamDesc.getStreamId()); + streamProxyConcurrentMap.get(streamDesc.getStreamId()).close(); + } catch (IOException e) { + LOGGER.error("Unable to close {}", streamProxyConcurrentMap.get(streamDesc.getStreamId())); + } finally { + LOGGER.info("Adding stream proxy {}", streamDesc.getStreamId()); + StreamProxyImpl proxy = new StreamProxyImpl(); + proxy.open(streamDesc); + streamProxyConcurrentMap.put(streamDesc.getStreamId(), proxy); + } + } + + @Override + public void onStreamRemoved(StreamDesc streamDesc) { + LOGGER.info("Removing stream proxy {}", streamDesc.getStreamId()); + if (streamProxyConcurrentMap.containsKey(streamDesc.getStreamId())) { + try { + streamProxyConcurrentMap.get(streamDesc.getStreamId()).close(); + } catch (IOException e) { + LOGGER.error("Unable to close {}", streamProxyConcurrentMap.get(streamDesc.getStreamId())); + } + } else { + LOGGER.warn("Unable to remove stream proxy {}, because not exist", streamDesc.getStreamId()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-streamproxy/src/test/java/org/apache/eagle/app/proxy/stream/StreamProxyTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/src/test/java/org/apache/eagle/app/proxy/stream/StreamProxyTest.java b/eagle-core/eagle-app/eagle-app-streamproxy/src/test/java/org/apache/eagle/app/proxy/stream/StreamProxyTest.java new file mode 100644 index 0000000..194ba39 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-streamproxy/src/test/java/org/apache/eagle/app/proxy/stream/StreamProxyTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.proxy.stream; + +import com.google.inject.Inject; +import com.typesafe.config.ConfigFactory; +import org.apache.eagle.app.messaging.KafkaStreamProvider; +import org.apache.eagle.app.messaging.KafkaStreamSinkConfig; +import org.apache.eagle.app.messaging.StreamRecord; +import org.apache.eagle.app.proxy.stream.impl.StreamProxyImpl; +import org.apache.eagle.app.test.ApplicationTestBase; +import org.apache.eagle.app.test.KafkaTestServer; +import org.apache.eagle.metadata.model.StreamDesc; +import org.junit.*; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.util.Collections; + +public class StreamProxyTest extends ApplicationTestBase { + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Inject + private StreamProxyManager proxyManager; + private KafkaTestServer kafkaTestServer; + private StreamDesc streamDesc; + + @Before + public void before() throws Exception { + kafkaTestServer = KafkaTestServer.create(temporaryFolder.newFolder()); + kafkaTestServer.start(); + this.streamDesc = new StreamDesc(); + this.streamDesc.setStreamId("TEST_METRIC_STREAM"); + KafkaStreamSinkConfig sinkConfig = new KafkaStreamProvider().getSinkConfig("TEST_METRIC_STREAM", ConfigFactory.load()); + this.streamDesc.setSinkConfig(sinkConfig); + } + + @After + public void after() throws IOException { + kafkaTestServer.stop(); + } + + @Test + public void testProxyManagerInjection() { + Assert.assertNotNull(proxyManager); + } + + @Test + public void testStreamProxyProduce() throws IOException { + StreamProxy streamProxy = new StreamProxyImpl(); + streamProxy.open(streamDesc); + streamProxy.send(Collections.singletonList(new StreamRecord() { + { + put("metric", "DiskUsage"); + put("host", "localhost"); + put("value", 98); + } + })); + streamProxy.close(); + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/eagle-app-streamproxy/src/test/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/src/test/resources/application.conf b/eagle-core/eagle-app/eagle-app-streamproxy/src/test/resources/application.conf new file mode 100644 index 0000000..cf43880 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-streamproxy/src/test/resources/application.conf @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +{ + "application":{ + "stream": { + "provider": "org.apache.eagle.app.messaging.KafkaStreamProvider" + } + "provider":{ + "loader": "org.apache.eagle.app.service.impl.ApplicationProviderSPILoader" + } + } + "appId":"test_topology_name" + "spoutNum": 3 + "loaded": true + "mode":"LOCAL" + "dataSinkConfig": { + "topic" : "test_topic", + "brokerList" : "sandbox.hortonworks.com:6667", + "serializerClass" : "kafka.serializer.StringEncoder", + "keySerializerClass" : "kafka.serializer.StringEncoder" + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-app/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/pom.xml b/eagle-core/eagle-app/pom.xml index b37c31f..f0677fe 100644 --- a/eagle-core/eagle-app/pom.xml +++ b/eagle-core/eagle-app/pom.xml @@ -34,6 +34,7 @@ <modules> <module>eagle-app-base</module> <module>eagle-app-utils</module> + <module>eagle-app-streamproxy</module> </modules> <build> http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/rest/RESTResponse.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/rest/RESTResponse.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/rest/RESTResponse.java index 7aaade3..1176bd9 100644 --- a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/rest/RESTResponse.java +++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/rest/RESTResponse.java @@ -29,6 +29,7 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.function.Consumer; import java.util.function.Supplier; @@ -212,7 +213,7 @@ public class RESTResponse<T> { return this; } - private void runAsync(CompletableFuture future) { + private void runAsync(Future future) { try { future.get(); } catch (InterruptedException ex) { http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java ---------------------------------------------------------------------- diff --git a/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java b/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java index 0c49d82..05c874d 100644 --- a/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java +++ b/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java @@ -43,7 +43,7 @@ public class HadoopMetricMonitorApp extends StormApplication { .namedByField("metric") .eventTimeByField("timestamp") .dimensionFields("host", "component", "site") - .granularity(Calendar.MINUTE) + .granularity(Calendar.SECOND) .valueField("value")) .fromStream("SYSTEM_METRIC_STREAM") .saveAsMetric(MetricDescriptor.metricGroupByField("group") @@ -51,7 +51,7 @@ public class HadoopMetricMonitorApp extends StormApplication { .namedByField("metric") .eventTimeByField("timestamp") .dimensionFields("host", "group", "site", "device") - .granularity(Calendar.MINUTE) + .granularity(Calendar.SECOND) .valueField("value") ) .toTopology(); http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml ---------------------------------------------------------------------- diff --git a/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml b/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml index eff1a70..ef13f23 100644 --- a/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml +++ b/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml @@ -110,7 +110,7 @@ <stream> <streamId>HADOOP_JMX_METRIC_STREAM</streamId> <description>Hadoop JMX Metric Stream including name node, resource manager, etc.</description> - <group>Hadoop Metric</group> + <group>hadoop metrics</group> <columns> <column> <name>host</name> @@ -142,7 +142,7 @@ <stream> <streamId>SYSTEM_METRIC_STREAM</streamId> <description>System Metrics Stream including CPU, Network, Disk, etc.</description> - <group>System Metric</group> + <group>system metrics</group> <columns> <column> <name>host</name> @@ -178,7 +178,7 @@ <stream> <streamId>HADOOP_JMX_RESOURCE_STREAM</streamId> <description>Hadoop JMX Resource Stream including name node, resource manager, etc.</description> - <group>Hadoop Metric</group> + <group>hadoop metrics</group> <columns> <column> <name>host</name> http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-server/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-server/pom.xml b/eagle-server/pom.xml index 77e974a..8db44cd 100644 --- a/eagle-server/pom.xml +++ b/eagle-server/pom.xml @@ -80,6 +80,17 @@ </dependency> <dependency> <groupId>org.apache.eagle</groupId> + <artifactId>eagle-app-streamproxy</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> <artifactId>eagle-common</artifactId> <version>${project.version}</version> <exclusions> http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-server/src/main/java/org/apache/eagle/server/security/BasicAuthRequestFilter.java ---------------------------------------------------------------------- diff --git a/eagle-server/src/main/java/org/apache/eagle/server/security/BasicAuthRequestFilter.java b/eagle-server/src/main/java/org/apache/eagle/server/security/BasicAuthRequestFilter.java index dffc197..81a10c8 100644 --- a/eagle-server/src/main/java/org/apache/eagle/server/security/BasicAuthRequestFilter.java +++ b/eagle-server/src/main/java/org/apache/eagle/server/security/BasicAuthRequestFilter.java @@ -24,7 +24,6 @@ import com.sun.jersey.core.util.Priority; import com.sun.jersey.spi.container.ContainerRequest; import com.sun.jersey.spi.container.ContainerRequestFilter; import io.dropwizard.auth.Auth; -import io.dropwizard.auth.AuthenticationException; import io.dropwizard.auth.Authenticator; import io.dropwizard.auth.basic.BasicCredentials; import org.apache.eagle.common.rest.RESTResponse; @@ -87,7 +86,12 @@ public class BasicAuthRequestFilter implements ContainerRequestFilter { .status(false, Response.Status.UNAUTHORIZED) .build(); - private static final Response ALL_ACCESS_DENIED = RESTResponse.builder() + private static final Response INVALID_ACCESS_FORBIDDEN = RESTResponse.builder() + .message("Access denied, invalid username or password") + .status(false, Response.Status.FORBIDDEN) + .build(); + + private static final Response ALL_ACCESS_FORBIDDEN = RESTResponse.builder() .message("Access denied") .status(false, Response.Status.FORBIDDEN) .build(); @@ -101,7 +105,7 @@ public class BasicAuthRequestFilter implements ContainerRequestFilter { //Access denied for all if (hasDenyAllAnnotation) { - throw new WebApplicationException(ALL_ACCESS_DENIED); + throw new WebApplicationException(ALL_ACCESS_FORBIDDEN); } //Get request headers @@ -175,8 +179,10 @@ public class BasicAuthRequestFilter implements ContainerRequestFilter { } } } else { - throw new WebApplicationException(UNAUTHORIZED_ACCESS_DENIED); + throw new WebApplicationException(INVALID_ACCESS_FORBIDDEN); } + } else { + throw new WebApplicationException(UNAUTHORIZED_ACCESS_DENIED); } } catch (WebApplicationException e) { throw e; http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-server/src/main/webapp/app/dev/index.html ---------------------------------------------------------------------- diff --git a/eagle-server/src/main/webapp/app/dev/index.html b/eagle-server/src/main/webapp/app/dev/index.html index e2547bf..e9c48a6 100644 --- a/eagle-server/src/main/webapp/app/dev/index.html +++ b/eagle-server/src/main/webapp/app/dev/index.html @@ -91,40 +91,23 @@ <a> <span ng-if="!Site.current()"> <span class="fa fa-home"></span> - Overview + Sites Overview </span> <span ng-if="Site.current()"> <span class="fa fa-server"></span> - {{Site.current().siteName || Site.current().siteId}} + Site: {{Site.current().siteName || Site.current().siteId}} </span> </a> <ul class="dropdown-menu"> - <li><a ng-click="Site.switchSite()"><span class="fa fa-home"></span> Overview</a></li> + <li><a ng-click="Site.switchSite()"><span class="fa fa-home"></span> Sites Overview</a></li> <li ng-repeat="site in Site.list track by $index"> <a ng-click="Site.switchSite(site)"> - <span class="fa fa-server"></span> {{site.siteName || site.siteId}} + <span class="fa fa-server"></span> Site: {{site.siteName || site.siteId}} </a> </li> </ul> </li> - - <!-- FAQ --> - <li> - <a data-toggle="dropdown" aria-expanded="false"> - <i class="glyphicon glyphicon-question-sign"></i> - </a> - - <ul class="dropdown-menu"> - <li><a>How to start using eagle</a></li> - <li><a>How to register new site</a></li> - <li><a>How to install application</a></li> - <li><a>How to manage application</a></li> - <li><a>How to develop application</a></li> - <li><a ui-sref="metricPreview()">Preview eagle metric</a></li> - </ul> - </li> - <!-- Notification --> <li class="hover-dropdown"> <a> @@ -146,6 +129,21 @@ </ul> </li> + <!-- FAQ --> + <li> + <a data-toggle="dropdown" aria-expanded="false"> + <i class="glyphicon glyphicon-question-sign"></i> + </a> + + <ul class="dropdown-menu"> + <li><a>How to start using eagle</a></li> + <li><a>How to register new site</a></li> + <li><a>How to install application</a></li> + <li><a>How to manage application</a></li> + <li><a>How to develop application</a></li> + <li><a ui-sref="metricPreview()">Preview eagle metric</a></li> + </ul> + </li> <!-- Auth --> <li ng-if="!Auth.isLogin"> <a ui-sref="login">Login</a> http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-server/src/main/webapp/app/dev/partials/alert/list.html ---------------------------------------------------------------------- diff --git a/eagle-server/src/main/webapp/app/dev/partials/alert/list.html b/eagle-server/src/main/webapp/app/dev/partials/alert/list.html index 9be9a56..371248f 100644 --- a/eagle-server/src/main/webapp/app/dev/partials/alert/list.html +++ b/eagle-server/src/main/webapp/app/dev/partials/alert/list.html @@ -20,7 +20,7 @@ <div class="box-header with-border"> <span class="fa fa-bell"></span> <h3 class="box-title"> - Alert List + Alert Incidents </h3> </div> <div class="box-body"> http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertCtrl.js ---------------------------------------------------------------------- diff --git a/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertCtrl.js b/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertCtrl.js index 9871ab7..88b44cd 100644 --- a/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertCtrl.js +++ b/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertCtrl.js @@ -25,7 +25,7 @@ // = Alert = // ====================================================================================== eagleControllers.controller('alertListCtrl', function ($scope, $wrapState, PageConfig, CompatibleEntity, Time) { - PageConfig.title = "Alerts"; + PageConfig.title = "Alert Incidents"; $scope.site = $wrapState.param.siteId; $scope.alertList = []; @@ -86,7 +86,7 @@ // = Stream = // ====================================================================================== eagleControllers.controller('alertStreamListCtrl', function ($scope, $wrapState, PageConfig, Application, Entity) { - PageConfig.title = "Streams"; + PageConfig.title = "Alert Streams"; $scope.streamList = []; $scope.site = $wrapState.param.siteId; @@ -119,7 +119,7 @@ // = Policy = // ====================================================================================== eagleControllers.controller('policyListCtrl', function ($scope, $wrapState, PageConfig, Entity, Policy) { - PageConfig.title = "Policies"; + PageConfig.title = "Alert Policies"; $scope.loading = false; $scope.policyList = []; http://git-wip-us.apache.org/repos/asf/eagle/blob/d5dce2b3/eagle-server/src/main/webapp/app/dev/public/js/ctrls/mainCtrl.js ---------------------------------------------------------------------- diff --git a/eagle-server/src/main/webapp/app/dev/public/js/ctrls/mainCtrl.js b/eagle-server/src/main/webapp/app/dev/public/js/ctrls/mainCtrl.js index 97b4704..03dfef9 100644 --- a/eagle-server/src/main/webapp/app/dev/public/js/ctrls/mainCtrl.js +++ b/eagle-server/src/main/webapp/app/dev/public/js/ctrls/mainCtrl.js @@ -25,7 +25,7 @@ // = Home = // ====================================================================================== eagleControllers.controller('homeCtrl', function ($scope, $wrapState, PageConfig) { - PageConfig.title = "Overveiw"; + PageConfig.title = "Overview"; $scope.colorList = [ "bg-aqua",
