Repository: metron Updated Branches: refs/heads/master a9079f546 -> 7f9f86c00
METRON-1128: MAAS_GET_ENDPOINT - Unable to resolve function error seen for squid example this closes apache/incubator-metron#714 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/7f9f86c0 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/7f9f86c0 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/7f9f86c0 Branch: refs/heads/master Commit: 7f9f86c005eb5d9d966d33df95cd060e2f8784bb Parents: a9079f5 Author: cstella <ceste...@gmail.com> Authored: Tue Aug 29 09:50:23 2017 -0400 Committer: cstella <ceste...@gmail.com> Committed: Tue Aug 29 09:50:23 2017 -0400 ---------------------------------------------------------------------- .gitignore | 2 +- metron-analytics/metron-maas-common/pom.xml | 11 + .../metron/maas/functions/MaaSFunctions.java | 324 +++++++++++++++++++ metron-analytics/metron-maas-service/README.md | 21 +- metron-analytics/metron-maas-service/pom.xml | 11 - .../metron/maas/functions/MaaSFunctions.java | 324 ------------------- .../org/apache/metron/maas/service/Client.java | 15 + .../apache/metron/maas/service/Constants.java | 5 +- .../metron/maas/service/runner/Runner.java | 9 +- .../metron/maas/submit/ModelSubmission.java | 5 + 10 files changed, 383 insertions(+), 344 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/7f9f86c0/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index d505c9c..12fd7cd 100644 --- a/.gitignore +++ b/.gitignore @@ -21,5 +21,5 @@ tmp/** tmp/**/* temp/** temp/**/* - +metron-interface/metron-alerts/node/ repodata/ http://git-wip-us.apache.org/repos/asf/metron/blob/7f9f86c0/metron-analytics/metron-maas-common/pom.xml ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-maas-common/pom.xml b/metron-analytics/metron-maas-common/pom.xml index 13fb7b9..6921e51 100644 --- a/metron-analytics/metron-maas-common/pom.xml +++ b/metron-analytics/metron-maas-common/pom.xml @@ -34,6 +34,17 @@ </properties> <dependencies> <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>stellar-common</artifactId> + <version>${project.parent.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> <version>1.2</version> http://git-wip-us.apache.org/repos/asf/metron/blob/7f9f86c0/metron-analytics/metron-maas-common/src/main/java/org/apache/metron/maas/functions/MaaSFunctions.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-maas-common/src/main/java/org/apache/metron/maas/functions/MaaSFunctions.java b/metron-analytics/metron-maas-common/src/main/java/org/apache/metron/maas/functions/MaaSFunctions.java new file mode 100644 index 0000000..eacb64d --- /dev/null +++ b/metron-analytics/metron-maas-common/src/main/java/org/apache/metron/maas/functions/MaaSFunctions.java @@ -0,0 +1,324 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.maas.functions; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import java.lang.invoke.MethodHandles; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import org.apache.curator.framework.CuratorFramework; +import org.apache.metron.maas.config.Endpoint; +import org.apache.metron.maas.config.MaaSConfig; +import org.apache.metron.maas.config.ModelEndpoint; +import org.apache.metron.maas.discovery.ServiceDiscoverer; +import org.apache.metron.maas.util.ConfigUtil; +import org.apache.metron.maas.util.RESTUtil; +import org.apache.metron.stellar.common.utils.JSONUtils; +import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.stellar.dsl.ParseException; +import org.apache.metron.stellar.dsl.Stellar; +import org.apache.metron.stellar.dsl.StellarFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MaaSFunctions { + protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static class ModelCacheKey { + String name; + String version; + String method; + Map<String, String> args; + public ModelCacheKey(String name, String version, String method, Map<String, String> args) { + this.name = name; + this.version = version; + this.method = method; + this.args = args; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ModelCacheKey that = (ModelCacheKey) o; + + if (name != null ? !name.equals(that.name) : that.name != null) return false; + if (version != null ? !version.equals(that.version) : that.version != null) return false; + if (method != null ? !method.equals(that.method) : that.method != null) return false; + return args != null ? args.equals(that.args) : that.args == null; + + } + + @Override + public int hashCode() { + int result = name != null ? name.hashCode() : 0; + result = 31 * result + (version != null ? version.hashCode() : 0); + result = 31 * result + (method != null ? method.hashCode() : 0); + result = 31 * result + (args != null ? args.hashCode() : 0); + return result; + } + } + + @Stellar(name="MODEL_APPLY" + , namespace="MAAS" + , description = "Returns the output of a model deployed via Model as a Service. NOTE: Results are cached locally for 10 minutes." + , params = { "endpoint - A map containing the name, version, and url for the REST endpoint" + , "function - The optional endpoint path; default is 'apply'" + , "model_args - A Dictionary of arguments for the model (these become request params)" + } + , returns = "The output of the model deployed as a REST endpoint in Map form. Assumes REST endpoint returns a JSON Map." + ) + public static class ModelApply implements StellarFunction { + private boolean isInitialized = false; + private ServiceDiscoverer discoverer; + private Cache<ModelCacheKey, Map<String, Object> > resultCache; + public ModelApply() { + resultCache = CacheBuilder.newBuilder() + .concurrencyLevel(4) + .weakKeys() + .maximumSize(100000) + .expireAfterWrite(10, TimeUnit.MINUTES) + .build(); + } + + @Override + public Object apply(List<Object> args, Context context) throws ParseException { + if(args.size() < 2) { + throw new ParseException("Unable to execute model_apply. " + + "Expected arguments: endpoint_map:map, " + + " [endpoint method:string], model_args:map" + ); + } + if(!isInitialized) { + return null; + } + int i = 0; + if(args.size() == 0) { + return null; + } + Object endpointObj = args.get(i++); + Map endpoint = null; + String modelName; + String modelVersion; + String modelUrl; + if(endpointObj instanceof Map) { + endpoint = (Map)endpointObj; + modelName = endpoint.get("name") + ""; + modelVersion = endpoint.get("version") + ""; + modelUrl = endpoint.get("url") + ""; + } + else { + return null; + } + String modelFunction = "apply"; + Map<String, String> modelArgs = new HashMap<>(); + if(args.get(i) instanceof String) { + String func = (String)args.get(i); + if(endpoint.containsKey("endpoint:" + func)) { + modelFunction = "" + endpoint.get("endpoint:" + func); + } + else { + modelFunction = func; + } + i++; + } + + if(args.get(i) instanceof Map) { + if(endpoint.containsKey("endpoint:apply")) { + modelFunction = "" + endpoint.get("endpoint:apply"); + } + modelArgs = (Map)args.get(i); + } + if( modelName == null + || modelVersion == null + || modelFunction == null + ) { + return null; + } + ModelCacheKey cacheKey = new ModelCacheKey(modelName, modelVersion, modelFunction, modelArgs); + Map<String, Object> ret = resultCache.getIfPresent(cacheKey); + if(ret != null) { + return ret; + } + else { + String url = modelUrl; + if (url.endsWith("/")) { + url = url.substring(0, url.length() - 1); + } + if (modelFunction.startsWith("/")) { + modelFunction = modelFunction.substring(1); + } + try { + URL u = new URL(url + "/" + modelFunction); + + String results = RESTUtil.INSTANCE.getRESTJSONResults(u, modelArgs); + ret = JSONUtils.INSTANCE.load(results, new TypeReference<Map<String, Object>>() { + }); + resultCache.put(cacheKey, ret); + return ret; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + if (discoverer != null) { + try { + URL u = new URL(modelUrl); + discoverer.blacklist(u); + } catch (MalformedURLException e1) { + } + } + } + } + return null; + } + + @Override + public synchronized void initialize(Context context) { + + try { + Optional<ServiceDiscoverer> discovererOpt = (Optional) (context.getCapability(Context.Capabilities.SERVICE_DISCOVERER)); + if (discovererOpt.isPresent()) { + discoverer = discovererOpt.get(); + } + else { + Optional<Object> clientOptional = context.getCapability(Context.Capabilities.ZOOKEEPER_CLIENT); + CuratorFramework client = null; + if (clientOptional.isPresent() && clientOptional.get() instanceof CuratorFramework) { + client = (CuratorFramework) clientOptional.get(); + } else { + throw new IllegalStateException("Unable to initialize function: Cannot find zookeeper client."); + } + discoverer = createDiscoverer(client); + } + } + catch(Exception ex) { + LOG.error(ex.getMessage(), ex); + } + finally { + //We always want to set initialize to true because we don't want to keep trying to initialize over and over + isInitialized = true; + } + } + + @Override + public boolean isInitialized() { + return isInitialized; + } + } + + private static ServiceDiscoverer createDiscoverer(CuratorFramework client) throws Exception { + MaaSConfig config = ConfigUtil.INSTANCE.read(client, "/metron/maas/config", new MaaSConfig(), MaaSConfig.class); + ServiceDiscoverer discoverer = new ServiceDiscoverer(client, config.getServiceRoot()); + discoverer.start(); + return discoverer; + } + + @Stellar(name="GET_ENDPOINT" + , namespace="MAAS" + , description="Inspects ZooKeeper and returns a map containing the name, version and url for the model referred to by the input parameters." + , params = { + "model_name - The name of the model" + ,"model_version - The optional version of the model. If the model version is not specified, the most current version is used." + } + , returns = "A map containing the name, version, and url for the REST endpoint (fields named name, version and url). " + + "Note that the output of this function is suitable for input into the first argument of MAAS_MODEL_APPLY." + ) + public static class GetEndpoint implements StellarFunction { + ServiceDiscoverer discoverer; + private boolean isInitialized = false; + private boolean isValidState = false; + + @Override + public Object apply(List<Object> args, Context context) throws ParseException { + if(!isValidState) { + LOG.error("Invalid state: Unable to find ServiceDiscoverer service."); + return null; + } + String modelName = null; + String modelVersion = null; + if(args.size() >= 1) { + modelName = args.get(0).toString(); + } + if(args.size() >= 2) + { + modelVersion = args.get(1).toString(); + } + if(modelName == null) { + return null; + } + try { + ModelEndpoint ep = null; + if (modelVersion == null) { + ep = discoverer.getEndpoint(modelName); + } else { + ep = discoverer.getEndpoint(modelName, modelVersion); + } + return ep == null ? null : endpointToMap(ep.getName(), ep.getVersion(), ep.getEndpoint()); + } + catch(Exception ex) { + LOG.error("Unable to discover endpoint: {}", ex.getMessage(), ex); + return null; + } + } + + public static Map<String, String> endpointToMap(String name, String version, Endpoint ep) { + Map<String, String> ret = new HashMap<>(); + ret.put("url", ep.getUrl()); + ret.put("name", name); + ret.put("version", version); + for(Map.Entry<String, String> kv : ep.getFunctions().entrySet()) { + ret.put("endpoint:" + kv.getKey(), kv.getValue()); + } + return ret; + } + + @Override + public synchronized void initialize(Context context) { + try { + Optional<Object> clientOptional = context.getCapability(Context.Capabilities.ZOOKEEPER_CLIENT); + CuratorFramework client = null; + if (clientOptional.isPresent() && clientOptional.get() instanceof CuratorFramework) { + client = (CuratorFramework) clientOptional.get(); + } else { + throw new IllegalStateException("Unable to initialize function: Cannot find zookeeper client."); + } + try { + discoverer = createDiscoverer(client); + context.addCapability(Context.Capabilities.SERVICE_DISCOVERER, () -> discoverer); + isValidState = true; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new IllegalStateException("Unable to initialize MAAS_GET_ENDPOINT", e); + } + } + finally { + isInitialized = true; + } + } + + @Override + public boolean isInitialized() { + return isInitialized; + } + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/7f9f86c0/metron-analytics/metron-maas-service/README.md ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-maas-service/README.md b/metron-analytics/metron-maas-service/README.md index 5ed108b..d234130 100644 --- a/metron-analytics/metron-maas-service/README.md +++ b/metron-analytics/metron-maas-service/README.md @@ -126,16 +126,29 @@ Now let's install some prerequisites: Start Squid via `service squid start` Now that we have flask and jinja, we can create a mock DGA service to deploy with MaaS: -* Download the files in [this](https://gist.github.com/cestella/cba10aff0f970078a4c2c8cade3a4d1a) gist into the `/root/mock_dga` directory -* Make `rest.sh` executable via `chmod +x /root/mock_dga/rest.sh` +* Download the files in [this](https://gist.github.com/cestella/cba10aff0f970078a4c2c8cade3a4d1a) gist into the `$HOME/mock_dga` directory +* Make `rest.sh` executable via `chmod +x $HOME/mock_dga/rest.sh` This service will treat `yahoo.com` and `amazon.com` as legit and everything else as malicious. The contract is that the REST service exposes an endpoint `/apply` and returns back JSON maps with a single key `is_malicious` which can be `malicious` or `legit`. ## Deploy Mock DGA Service via MaaS +The following presumes that you are a logged in as a user who has a +home directory in HDFS under `/user/$USER`. If you do not, please create one +and ensure the permissions are set appropriate: +``` +su - hdfs -c "hadoop fs -mkdir /user/$USER" +su - hdfs -c "hadoop fs -chown $USER:$USER /user/$USER" +``` +Or, in the common case for the `metron` user: +``` +su - hdfs -c "hadoop fs -mkdir /user/metron" +su - hdfs -c "hadoop fs -chown metron:metron /user/metron" +``` + Now let's start MaaS and deploy the Mock DGA Service: * Start MaaS via `$METRON_HOME/bin/maas_service.sh -zq node1:2181` -* Start one instance of the mock DGA model with 512M of memory via `$METRON_HOME/bin/maas_deploy.sh -zq node1:2181 -lmp /root/mock_dga -hmp /user/root/models -mo ADD -m 512 -n dga -v 1.0 -ni 1` +* Start one instance of the mock DGA model with 512M of memory via `$METRON_HOME/bin/maas_deploy.sh -zq node1:2181 -lmp $HOME/mock_dga -hmp /user/$USER/models -mo ADD -m 512 -n dga -v 1.0 -ni 1` * As a sanity check: * Ensure that the model is running via `$METRON_HOME/bin/maas_deploy.sh -zq node1:2181 -mo LIST`. You should see `Model dga @ 1.0` be displayed and under that a url such as (but not exactly) `http://node1:36161` * Try to hit the model via curl: `curl 'http://localhost:36161/apply?host=caseystella.com'` and ensure that it returns a JSON map indicating the domain is malicious. @@ -170,8 +183,6 @@ Now that we have a deployed model, let's adjust the configurations for the Squid * Edit the squid enrichment configuration at `$METRON_HOME/config/zookeeper/enrichments/squid.json` (this file will not exist, so create a new one) to make the threat triage adjust the level of risk based on the model output: ``` { - "index": "squid", - "batchSize": 1, "enrichment" : { "fieldMap": {} }, http://git-wip-us.apache.org/repos/asf/metron/blob/7f9f86c0/metron-analytics/metron-maas-service/pom.xml ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-maas-service/pom.xml b/metron-analytics/metron-maas-service/pom.xml index 0ac9bac..4eeceae 100644 --- a/metron-analytics/metron-maas-service/pom.xml +++ b/metron-analytics/metron-maas-service/pom.xml @@ -49,17 +49,6 @@ <version>${global_kryo_serializers_version}</version> </dependency> <dependency> - <groupId>org.apache.metron</groupId> - <artifactId>stellar-common</artifactId> - <version>${project.parent.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-auth</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-server-common</artifactId> <version>${hadoop.version}</version> http://git-wip-us.apache.org/repos/asf/metron/blob/7f9f86c0/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/functions/MaaSFunctions.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/functions/MaaSFunctions.java b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/functions/MaaSFunctions.java deleted file mode 100644 index eacb64d..0000000 --- a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/functions/MaaSFunctions.java +++ /dev/null @@ -1,324 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.metron.maas.functions; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import java.lang.invoke.MethodHandles; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.TimeUnit; -import org.apache.curator.framework.CuratorFramework; -import org.apache.metron.maas.config.Endpoint; -import org.apache.metron.maas.config.MaaSConfig; -import org.apache.metron.maas.config.ModelEndpoint; -import org.apache.metron.maas.discovery.ServiceDiscoverer; -import org.apache.metron.maas.util.ConfigUtil; -import org.apache.metron.maas.util.RESTUtil; -import org.apache.metron.stellar.common.utils.JSONUtils; -import org.apache.metron.stellar.dsl.Context; -import org.apache.metron.stellar.dsl.ParseException; -import org.apache.metron.stellar.dsl.Stellar; -import org.apache.metron.stellar.dsl.StellarFunction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MaaSFunctions { - protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static class ModelCacheKey { - String name; - String version; - String method; - Map<String, String> args; - public ModelCacheKey(String name, String version, String method, Map<String, String> args) { - this.name = name; - this.version = version; - this.method = method; - this.args = args; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - ModelCacheKey that = (ModelCacheKey) o; - - if (name != null ? !name.equals(that.name) : that.name != null) return false; - if (version != null ? !version.equals(that.version) : that.version != null) return false; - if (method != null ? !method.equals(that.method) : that.method != null) return false; - return args != null ? args.equals(that.args) : that.args == null; - - } - - @Override - public int hashCode() { - int result = name != null ? name.hashCode() : 0; - result = 31 * result + (version != null ? version.hashCode() : 0); - result = 31 * result + (method != null ? method.hashCode() : 0); - result = 31 * result + (args != null ? args.hashCode() : 0); - return result; - } - } - - @Stellar(name="MODEL_APPLY" - , namespace="MAAS" - , description = "Returns the output of a model deployed via Model as a Service. NOTE: Results are cached locally for 10 minutes." - , params = { "endpoint - A map containing the name, version, and url for the REST endpoint" - , "function - The optional endpoint path; default is 'apply'" - , "model_args - A Dictionary of arguments for the model (these become request params)" - } - , returns = "The output of the model deployed as a REST endpoint in Map form. Assumes REST endpoint returns a JSON Map." - ) - public static class ModelApply implements StellarFunction { - private boolean isInitialized = false; - private ServiceDiscoverer discoverer; - private Cache<ModelCacheKey, Map<String, Object> > resultCache; - public ModelApply() { - resultCache = CacheBuilder.newBuilder() - .concurrencyLevel(4) - .weakKeys() - .maximumSize(100000) - .expireAfterWrite(10, TimeUnit.MINUTES) - .build(); - } - - @Override - public Object apply(List<Object> args, Context context) throws ParseException { - if(args.size() < 2) { - throw new ParseException("Unable to execute model_apply. " + - "Expected arguments: endpoint_map:map, " + - " [endpoint method:string], model_args:map" - ); - } - if(!isInitialized) { - return null; - } - int i = 0; - if(args.size() == 0) { - return null; - } - Object endpointObj = args.get(i++); - Map endpoint = null; - String modelName; - String modelVersion; - String modelUrl; - if(endpointObj instanceof Map) { - endpoint = (Map)endpointObj; - modelName = endpoint.get("name") + ""; - modelVersion = endpoint.get("version") + ""; - modelUrl = endpoint.get("url") + ""; - } - else { - return null; - } - String modelFunction = "apply"; - Map<String, String> modelArgs = new HashMap<>(); - if(args.get(i) instanceof String) { - String func = (String)args.get(i); - if(endpoint.containsKey("endpoint:" + func)) { - modelFunction = "" + endpoint.get("endpoint:" + func); - } - else { - modelFunction = func; - } - i++; - } - - if(args.get(i) instanceof Map) { - if(endpoint.containsKey("endpoint:apply")) { - modelFunction = "" + endpoint.get("endpoint:apply"); - } - modelArgs = (Map)args.get(i); - } - if( modelName == null - || modelVersion == null - || modelFunction == null - ) { - return null; - } - ModelCacheKey cacheKey = new ModelCacheKey(modelName, modelVersion, modelFunction, modelArgs); - Map<String, Object> ret = resultCache.getIfPresent(cacheKey); - if(ret != null) { - return ret; - } - else { - String url = modelUrl; - if (url.endsWith("/")) { - url = url.substring(0, url.length() - 1); - } - if (modelFunction.startsWith("/")) { - modelFunction = modelFunction.substring(1); - } - try { - URL u = new URL(url + "/" + modelFunction); - - String results = RESTUtil.INSTANCE.getRESTJSONResults(u, modelArgs); - ret = JSONUtils.INSTANCE.load(results, new TypeReference<Map<String, Object>>() { - }); - resultCache.put(cacheKey, ret); - return ret; - } catch (Exception e) { - LOG.error(e.getMessage(), e); - if (discoverer != null) { - try { - URL u = new URL(modelUrl); - discoverer.blacklist(u); - } catch (MalformedURLException e1) { - } - } - } - } - return null; - } - - @Override - public synchronized void initialize(Context context) { - - try { - Optional<ServiceDiscoverer> discovererOpt = (Optional) (context.getCapability(Context.Capabilities.SERVICE_DISCOVERER)); - if (discovererOpt.isPresent()) { - discoverer = discovererOpt.get(); - } - else { - Optional<Object> clientOptional = context.getCapability(Context.Capabilities.ZOOKEEPER_CLIENT); - CuratorFramework client = null; - if (clientOptional.isPresent() && clientOptional.get() instanceof CuratorFramework) { - client = (CuratorFramework) clientOptional.get(); - } else { - throw new IllegalStateException("Unable to initialize function: Cannot find zookeeper client."); - } - discoverer = createDiscoverer(client); - } - } - catch(Exception ex) { - LOG.error(ex.getMessage(), ex); - } - finally { - //We always want to set initialize to true because we don't want to keep trying to initialize over and over - isInitialized = true; - } - } - - @Override - public boolean isInitialized() { - return isInitialized; - } - } - - private static ServiceDiscoverer createDiscoverer(CuratorFramework client) throws Exception { - MaaSConfig config = ConfigUtil.INSTANCE.read(client, "/metron/maas/config", new MaaSConfig(), MaaSConfig.class); - ServiceDiscoverer discoverer = new ServiceDiscoverer(client, config.getServiceRoot()); - discoverer.start(); - return discoverer; - } - - @Stellar(name="GET_ENDPOINT" - , namespace="MAAS" - , description="Inspects ZooKeeper and returns a map containing the name, version and url for the model referred to by the input parameters." - , params = { - "model_name - The name of the model" - ,"model_version - The optional version of the model. If the model version is not specified, the most current version is used." - } - , returns = "A map containing the name, version, and url for the REST endpoint (fields named name, version and url). " + - "Note that the output of this function is suitable for input into the first argument of MAAS_MODEL_APPLY." - ) - public static class GetEndpoint implements StellarFunction { - ServiceDiscoverer discoverer; - private boolean isInitialized = false; - private boolean isValidState = false; - - @Override - public Object apply(List<Object> args, Context context) throws ParseException { - if(!isValidState) { - LOG.error("Invalid state: Unable to find ServiceDiscoverer service."); - return null; - } - String modelName = null; - String modelVersion = null; - if(args.size() >= 1) { - modelName = args.get(0).toString(); - } - if(args.size() >= 2) - { - modelVersion = args.get(1).toString(); - } - if(modelName == null) { - return null; - } - try { - ModelEndpoint ep = null; - if (modelVersion == null) { - ep = discoverer.getEndpoint(modelName); - } else { - ep = discoverer.getEndpoint(modelName, modelVersion); - } - return ep == null ? null : endpointToMap(ep.getName(), ep.getVersion(), ep.getEndpoint()); - } - catch(Exception ex) { - LOG.error("Unable to discover endpoint: {}", ex.getMessage(), ex); - return null; - } - } - - public static Map<String, String> endpointToMap(String name, String version, Endpoint ep) { - Map<String, String> ret = new HashMap<>(); - ret.put("url", ep.getUrl()); - ret.put("name", name); - ret.put("version", version); - for(Map.Entry<String, String> kv : ep.getFunctions().entrySet()) { - ret.put("endpoint:" + kv.getKey(), kv.getValue()); - } - return ret; - } - - @Override - public synchronized void initialize(Context context) { - try { - Optional<Object> clientOptional = context.getCapability(Context.Capabilities.ZOOKEEPER_CLIENT); - CuratorFramework client = null; - if (clientOptional.isPresent() && clientOptional.get() instanceof CuratorFramework) { - client = (CuratorFramework) clientOptional.get(); - } else { - throw new IllegalStateException("Unable to initialize function: Cannot find zookeeper client."); - } - try { - discoverer = createDiscoverer(client); - context.addCapability(Context.Capabilities.SERVICE_DISCOVERER, () -> discoverer); - isValidState = true; - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new IllegalStateException("Unable to initialize MAAS_GET_ENDPOINT", e); - } - } - finally { - isInitialized = true; - } - } - - @Override - public boolean isInitialized() { - return isInitialized; - } - } -} http://git-wip-us.apache.org/repos/asf/metron/blob/7f9f86c0/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/Client.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/Client.java b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/Client.java index c2d8906..9cabf21 100644 --- a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/Client.java +++ b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/Client.java @@ -24,6 +24,7 @@ import java.util.*; import java.util.function.Function; import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import org.apache.commons.cli.*; import org.apache.commons.cli.CommandLine; @@ -558,6 +559,7 @@ public class Client { // Copy the application master jar to the filesystem // Create a local resource to point to the destination jar path FileSystem fs = FileSystem.get(conf); + createMaaSDirectory(fs, appId.toString()); Path ajPath = addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(), localResources, null); // Set the log4j properties if needed @@ -789,6 +791,18 @@ public class Client { yarnClient.killApplication(appId); } + private void createMaaSDirectory(FileSystem fs, String appId) throws IOException { + for(Path p : ImmutableList.of(new Path(fs.getHomeDirectory(), appName) + , new Path(fs.getHomeDirectory(), appName + "/" + appId) + ) + ) { + if(!fs.exists(p)) { + fs.mkdirs(p); + fs.setPermission(p, new FsPermission((short)0755)); + } + } + } + private Path addToLocalResources(FileSystem fs, String fileSrcPath, String fileDstPath, String appId, Map<String, LocalResource> localResources, String resources) throws IOException { @@ -808,6 +822,7 @@ public class Client { } else { fs.copyFromLocalFile(new Path(fileSrcPath), dst); } + fs.setPermission(dst, new FsPermission((short)0755)); FileStatus scFileStatus = fs.getFileStatus(dst); LocalResource scRsrc = LocalResource.newInstance( http://git-wip-us.apache.org/repos/asf/metron/blob/7f9f86c0/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/Constants.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/Constants.java b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/Constants.java index ac2c950..d032511 100644 --- a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/Constants.java +++ b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/Constants.java @@ -31,5 +31,8 @@ public class Constants { * Environment key name denoting the timeline domain ID. */ public static final String TIMELINEDOMAIN = "TIMELINEDOMAIN"; - + /* + The filename which communicates the endpoint information for a deployed model + */ + public static final String ENDPOINT_DAT = "endpoint.dat"; } http://git-wip-us.apache.org/repos/asf/metron/blob/7f9f86c0/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/runner/Runner.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/runner/Runner.java b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/runner/Runner.java index cc297d2..8f0b9e5 100644 --- a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/runner/Runner.java +++ b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/runner/Runner.java @@ -33,6 +33,7 @@ import org.apache.curator.utils.CloseableUtils; import org.apache.curator.x.discovery.*; import org.apache.curator.x.discovery.details.JsonInstanceSerializer; import org.apache.metron.maas.config.Endpoint; +import org.apache.metron.maas.service.Constants; import org.apache.metron.maas.util.ConfigUtil; import org.apache.metron.maas.config.MaaSConfig; import org.apache.metron.maas.config.ModelEndpoint; @@ -202,7 +203,11 @@ public class Runner { serviceDiscovery.start(); File cwd = new File(script).getParentFile(); - final String cmd = new File(cwd, script).getAbsolutePath(); + File scriptFile = new File(cwd, script); + if(scriptFile.exists() && !scriptFile.canExecute()) { + scriptFile.setExecutable(true); + } + final String cmd = scriptFile.getAbsolutePath(); try { p = new ProcessBuilder(cmd).directory(cwd).start(); @@ -299,7 +304,7 @@ public class Runner { private static Endpoint readEndpoint(File cwd) throws Exception { String content = ""; - File f = new File(cwd, "endpoint.dat"); + File f = new File(cwd, Constants.ENDPOINT_DAT); for(int i = 0;i < NUM_ATTEMPTS;i++) { if(f.exists()) { try { http://git-wip-us.apache.org/repos/asf/metron/blob/7f9f86c0/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/submit/ModelSubmission.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/submit/ModelSubmission.java b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/submit/ModelSubmission.java index ebfa904..fcae40a 100644 --- a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/submit/ModelSubmission.java +++ b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/submit/ModelSubmission.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path; import org.apache.log4j.PropertyConfigurator; import org.apache.metron.maas.config.*; import org.apache.metron.maas.discovery.ServiceDiscoverer; +import org.apache.metron.maas.service.Constants; import org.apache.metron.maas.service.Log4jPropertyHelper; import org.apache.metron.maas.util.ConfigUtil; import org.apache.metron.maas.queue.Queue; @@ -247,6 +248,10 @@ public class ModelSubmission { fs.mkdirs(hdfsPath); } for(File f : localDir.listFiles()) { + if(f.getName().equals(Constants.ENDPOINT_DAT)) { + //skip the endpoint if it exists accidentally, we don't want to localize that. + continue; + } Path p = new Path(hdfsPath, f.getName()); FSDataOutputStream out = fs.create(p); BufferedInputStream in = new BufferedInputStream(new FileInputStream(f));