Repository: metron
Updated Branches:
  refs/heads/master a9079f546 -> 7f9f86c00


METRON-1128: MAAS_GET_ENDPOINT - Unable to resolve function error seen for 
squid example this closes apache/incubator-metron#714


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/7f9f86c0
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/7f9f86c0
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/7f9f86c0

Branch: refs/heads/master
Commit: 7f9f86c005eb5d9d966d33df95cd060e2f8784bb
Parents: a9079f5
Author: cstella <ceste...@gmail.com>
Authored: Tue Aug 29 09:50:23 2017 -0400
Committer: cstella <ceste...@gmail.com>
Committed: Tue Aug 29 09:50:23 2017 -0400

----------------------------------------------------------------------
 .gitignore                                      |   2 +-
 metron-analytics/metron-maas-common/pom.xml     |  11 +
 .../metron/maas/functions/MaaSFunctions.java    | 324 +++++++++++++++++++
 metron-analytics/metron-maas-service/README.md  |  21 +-
 metron-analytics/metron-maas-service/pom.xml    |  11 -
 .../metron/maas/functions/MaaSFunctions.java    | 324 -------------------
 .../org/apache/metron/maas/service/Client.java  |  15 +
 .../apache/metron/maas/service/Constants.java   |   5 +-
 .../metron/maas/service/runner/Runner.java      |   9 +-
 .../metron/maas/submit/ModelSubmission.java     |   5 +
 10 files changed, 383 insertions(+), 344 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/7f9f86c0/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index d505c9c..12fd7cd 100644
--- a/.gitignore
+++ b/.gitignore
@@ -21,5 +21,5 @@ tmp/**
 tmp/**/*
 temp/**
 temp/**/*
-
+metron-interface/metron-alerts/node/
 repodata/

http://git-wip-us.apache.org/repos/asf/metron/blob/7f9f86c0/metron-analytics/metron-maas-common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-maas-common/pom.xml 
b/metron-analytics/metron-maas-common/pom.xml
index 13fb7b9..6921e51 100644
--- a/metron-analytics/metron-maas-common/pom.xml
+++ b/metron-analytics/metron-maas-common/pom.xml
@@ -34,6 +34,17 @@
   </properties>
   <dependencies>
     <dependency>
+      <groupId>org.apache.metron</groupId>
+      <artifactId>stellar-common</artifactId>
+      <version>${project.parent.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-auth</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
       <groupId>commons-cli</groupId>
       <artifactId>commons-cli</artifactId>
       <version>1.2</version>

http://git-wip-us.apache.org/repos/asf/metron/blob/7f9f86c0/metron-analytics/metron-maas-common/src/main/java/org/apache/metron/maas/functions/MaaSFunctions.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-maas-common/src/main/java/org/apache/metron/maas/functions/MaaSFunctions.java
 
b/metron-analytics/metron-maas-common/src/main/java/org/apache/metron/maas/functions/MaaSFunctions.java
new file mode 100644
index 0000000..eacb64d
--- /dev/null
+++ 
b/metron-analytics/metron-maas-common/src/main/java/org/apache/metron/maas/functions/MaaSFunctions.java
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.maas.functions;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.lang.invoke.MethodHandles;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.maas.config.Endpoint;
+import org.apache.metron.maas.config.MaaSConfig;
+import org.apache.metron.maas.config.ModelEndpoint;
+import org.apache.metron.maas.discovery.ServiceDiscoverer;
+import org.apache.metron.maas.util.ConfigUtil;
+import org.apache.metron.maas.util.RESTUtil;
+import org.apache.metron.stellar.common.utils.JSONUtils;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.ParseException;
+import org.apache.metron.stellar.dsl.Stellar;
+import org.apache.metron.stellar.dsl.StellarFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MaaSFunctions {
+ protected static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static class ModelCacheKey {
+    String name;
+    String version;
+    String method;
+    Map<String, String> args;
+    public ModelCacheKey(String name, String version, String method, 
Map<String, String> args) {
+      this.name = name;
+      this.version = version;
+      this.method = method;
+      this.args = args;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      ModelCacheKey that = (ModelCacheKey) o;
+
+      if (name != null ? !name.equals(that.name) : that.name != null) return 
false;
+      if (version != null ? !version.equals(that.version) : that.version != 
null) return false;
+      if (method != null ? !method.equals(that.method) : that.method != null) 
return false;
+      return args != null ? args.equals(that.args) : that.args == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = name != null ? name.hashCode() : 0;
+      result = 31 * result + (version != null ? version.hashCode() : 0);
+      result = 31 * result + (method != null ? method.hashCode() : 0);
+      result = 31 * result + (args != null ? args.hashCode() : 0);
+      return result;
+    }
+  }
+
+  @Stellar(name="MODEL_APPLY"
+          , namespace="MAAS"
+          , description = "Returns the output of a model deployed via Model as 
a Service. NOTE: Results are cached locally for 10 minutes."
+          , params = { "endpoint - A map containing the name, version, and url 
for the REST endpoint"
+                     , "function - The optional endpoint path; default is 
'apply'"
+                     , "model_args - A Dictionary of arguments for the model 
(these become request params)"
+                     }
+          , returns = "The output of the model deployed as a REST endpoint in 
Map form.  Assumes REST endpoint returns a JSON Map."
+          )
+  public static class ModelApply implements StellarFunction {
+    private boolean isInitialized = false;
+    private ServiceDiscoverer discoverer;
+    private Cache<ModelCacheKey, Map<String, Object> > resultCache;
+    public ModelApply() {
+      resultCache = CacheBuilder.newBuilder()
+                            .concurrencyLevel(4)
+                            .weakKeys()
+                            .maximumSize(100000)
+                            .expireAfterWrite(10, TimeUnit.MINUTES)
+                            .build();
+    }
+
+    @Override
+    public Object apply(List<Object> args, Context context) throws 
ParseException {
+      if(args.size() < 2) {
+        throw new ParseException("Unable to execute model_apply. " +
+                                 "Expected arguments: endpoint_map:map, " +
+                                 " [endpoint method:string], model_args:map"
+                                 );
+      }
+      if(!isInitialized) {
+        return null;
+      }
+      int i = 0;
+      if(args.size() == 0) {
+        return null;
+      }
+      Object endpointObj = args.get(i++);
+      Map endpoint = null;
+      String modelName;
+      String modelVersion;
+      String modelUrl;
+      if(endpointObj instanceof Map) {
+        endpoint = (Map)endpointObj;
+        modelName = endpoint.get("name") + "";
+        modelVersion = endpoint.get("version") + "";
+        modelUrl = endpoint.get("url") + "";
+      }
+      else {
+        return null;
+      }
+      String modelFunction = "apply";
+      Map<String, String> modelArgs = new HashMap<>();
+      if(args.get(i) instanceof String) {
+        String func = (String)args.get(i);
+        if(endpoint.containsKey("endpoint:" + func)) {
+          modelFunction = "" + endpoint.get("endpoint:" + func);
+        }
+        else {
+          modelFunction = func;
+        }
+        i++;
+      }
+
+      if(args.get(i) instanceof Map) {
+        if(endpoint.containsKey("endpoint:apply")) {
+          modelFunction = "" + endpoint.get("endpoint:apply");
+        }
+        modelArgs = (Map)args.get(i);
+      }
+      if( modelName == null
+       || modelVersion == null
+       || modelFunction == null
+        ) {
+        return null;
+      }
+      ModelCacheKey cacheKey = new ModelCacheKey(modelName, modelVersion, 
modelFunction, modelArgs);
+      Map<String, Object> ret = resultCache.getIfPresent(cacheKey);
+      if(ret != null) {
+        return ret;
+      }
+      else {
+        String url = modelUrl;
+        if (url.endsWith("/")) {
+          url = url.substring(0, url.length() - 1);
+        }
+        if (modelFunction.startsWith("/")) {
+          modelFunction = modelFunction.substring(1);
+        }
+        try {
+          URL u = new URL(url + "/" + modelFunction);
+
+          String results = RESTUtil.INSTANCE.getRESTJSONResults(u, modelArgs);
+          ret = JSONUtils.INSTANCE.load(results, new TypeReference<Map<String, 
Object>>() {
+          });
+          resultCache.put(cacheKey, ret);
+          return ret;
+        } catch (Exception e) {
+          LOG.error(e.getMessage(), e);
+          if (discoverer != null) {
+            try {
+              URL u = new URL(modelUrl);
+              discoverer.blacklist(u);
+            } catch (MalformedURLException e1) {
+            }
+          }
+        }
+      }
+      return null;
+    }
+
+    @Override
+    public synchronized void initialize(Context context) {
+
+      try {
+        Optional<ServiceDiscoverer> discovererOpt = (Optional) 
(context.getCapability(Context.Capabilities.SERVICE_DISCOVERER));
+        if (discovererOpt.isPresent()) {
+          discoverer = discovererOpt.get();
+        }
+        else {
+          Optional<Object> clientOptional = 
context.getCapability(Context.Capabilities.ZOOKEEPER_CLIENT);
+          CuratorFramework client = null;
+          if (clientOptional.isPresent() && clientOptional.get() instanceof 
CuratorFramework) {
+            client = (CuratorFramework) clientOptional.get();
+          } else {
+            throw new IllegalStateException("Unable to initialize function: 
Cannot find zookeeper client.");
+          }
+          discoverer = createDiscoverer(client);
+        }
+      }
+      catch(Exception ex) {
+        LOG.error(ex.getMessage(), ex);
+      }
+      finally {
+        //We always want to set initialize to true because we don't want to 
keep trying to initialize over and over
+        isInitialized = true;
+      }
+    }
+
+    @Override
+    public boolean isInitialized() {
+      return isInitialized;
+    }
+  }
+
+  private static ServiceDiscoverer createDiscoverer(CuratorFramework client) 
throws Exception {
+    MaaSConfig config = ConfigUtil.INSTANCE.read(client, 
"/metron/maas/config", new MaaSConfig(), MaaSConfig.class);
+    ServiceDiscoverer discoverer = new ServiceDiscoverer(client, 
config.getServiceRoot());
+    discoverer.start();
+    return discoverer;
+  }
+
+  @Stellar(name="GET_ENDPOINT"
+          , namespace="MAAS"
+          , description="Inspects ZooKeeper and returns a map containing the 
name, version and url for the model referred to by the input parameters."
+          , params = {
+                      "model_name - The name of the model"
+                     ,"model_version - The optional version of the model.  If 
the model version is not specified, the most current version is used."
+                     }
+          , returns = "A map containing the name, version, and url for the 
REST endpoint (fields named name, version and url).  " +
+                      "Note that the output of this function is suitable for 
input into the first argument of MAAS_MODEL_APPLY."
+          )
+  public static class GetEndpoint implements StellarFunction {
+    ServiceDiscoverer discoverer;
+    private boolean isInitialized = false;
+    private boolean isValidState = false;
+
+    @Override
+    public Object apply(List<Object> args, Context context) throws 
ParseException {
+      if(!isValidState) {
+        LOG.error("Invalid state: Unable to find ServiceDiscoverer service.");
+        return null;
+      }
+      String modelName = null;
+      String modelVersion = null;
+      if(args.size() >= 1) {
+        modelName = args.get(0).toString();
+      }
+      if(args.size() >= 2)
+      {
+        modelVersion = args.get(1).toString();
+      }
+      if(modelName == null) {
+        return null;
+      }
+      try {
+        ModelEndpoint ep = null;
+        if (modelVersion == null) {
+          ep = discoverer.getEndpoint(modelName);
+        } else {
+          ep = discoverer.getEndpoint(modelName, modelVersion);
+        }
+        return ep == null ? null : endpointToMap(ep.getName(), 
ep.getVersion(), ep.getEndpoint());
+      }
+      catch(Exception ex) {
+        LOG.error("Unable to discover endpoint: {}", ex.getMessage(), ex);
+        return null;
+      }
+    }
+
+    public static Map<String, String> endpointToMap(String name, String 
version, Endpoint ep) {
+      Map<String, String> ret = new HashMap<>();
+      ret.put("url", ep.getUrl());
+      ret.put("name", name);
+      ret.put("version", version);
+      for(Map.Entry<String, String> kv : ep.getFunctions().entrySet()) {
+        ret.put("endpoint:" + kv.getKey(), kv.getValue());
+      }
+      return ret;
+    }
+
+    @Override
+    public synchronized void initialize(Context context) {
+      try {
+        Optional<Object> clientOptional = 
context.getCapability(Context.Capabilities.ZOOKEEPER_CLIENT);
+        CuratorFramework client = null;
+        if (clientOptional.isPresent() && clientOptional.get() instanceof 
CuratorFramework) {
+          client = (CuratorFramework) clientOptional.get();
+        } else {
+          throw new IllegalStateException("Unable to initialize function: 
Cannot find zookeeper client.");
+        }
+        try {
+          discoverer = createDiscoverer(client);
+          context.addCapability(Context.Capabilities.SERVICE_DISCOVERER, () -> 
discoverer);
+          isValidState = true;
+        } catch (Exception e) {
+          LOG.error(e.getMessage(), e);
+          throw new IllegalStateException("Unable to initialize 
MAAS_GET_ENDPOINT", e);
+        }
+      }
+      finally {
+        isInitialized = true;
+      }
+    }
+
+    @Override
+    public boolean isInitialized() {
+      return isInitialized;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/7f9f86c0/metron-analytics/metron-maas-service/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-maas-service/README.md 
b/metron-analytics/metron-maas-service/README.md
index 5ed108b..d234130 100644
--- a/metron-analytics/metron-maas-service/README.md
+++ b/metron-analytics/metron-maas-service/README.md
@@ -126,16 +126,29 @@ Now let's install some prerequisites:
 Start Squid via `service squid start`
 
 Now that we have flask and jinja, we can create a mock DGA service to deploy 
with MaaS:
-* Download the files in 
[this](https://gist.github.com/cestella/cba10aff0f970078a4c2c8cade3a4d1a) gist 
into the `/root/mock_dga` directory
-* Make `rest.sh` executable via `chmod +x /root/mock_dga/rest.sh`
+* Download the files in 
[this](https://gist.github.com/cestella/cba10aff0f970078a4c2c8cade3a4d1a) gist 
into the `$HOME/mock_dga` directory
+* Make `rest.sh` executable via `chmod +x $HOME/mock_dga/rest.sh`
 
 This service will treat `yahoo.com` and `amazon.com` as legit and everything 
else as malicious.  The contract is that the REST service exposes an endpoint 
`/apply` and returns back JSON maps with a single key `is_malicious` which can 
be `malicious` or `legit`.
 
 ## Deploy Mock DGA Service via MaaS
 
+The following presumes that you are a logged in as a user who has a
+home directory in HDFS under `/user/$USER`.  If you do not, please create one
+and ensure the permissions are set appropriate:
+```
+su - hdfs -c "hadoop fs -mkdir /user/$USER"
+su - hdfs -c "hadoop fs -chown $USER:$USER /user/$USER"
+```
+Or, in the common case for the `metron` user:
+```
+su - hdfs -c "hadoop fs -mkdir /user/metron"
+su - hdfs -c "hadoop fs -chown metron:metron /user/metron"
+```
+
 Now let's start MaaS and deploy the Mock DGA Service:
 * Start MaaS via `$METRON_HOME/bin/maas_service.sh -zq node1:2181`
-* Start one instance of the mock DGA model with 512M of memory via 
`$METRON_HOME/bin/maas_deploy.sh -zq node1:2181 -lmp /root/mock_dga -hmp 
/user/root/models -mo ADD -m 512 -n dga -v 1.0 -ni 1`
+* Start one instance of the mock DGA model with 512M of memory via 
`$METRON_HOME/bin/maas_deploy.sh -zq node1:2181 -lmp $HOME/mock_dga -hmp 
/user/$USER/models -mo ADD -m 512 -n dga -v 1.0 -ni 1`
 * As a sanity check:
   * Ensure that the model is running via `$METRON_HOME/bin/maas_deploy.sh -zq 
node1:2181 -mo LIST`.  You should see `Model dga @ 1.0` be displayed and under 
that a url such as (but not exactly) `http://node1:36161`
   * Try to hit the model via curl: `curl 
'http://localhost:36161/apply?host=caseystella.com'` and ensure that it returns 
a JSON map indicating the domain is malicious.
@@ -170,8 +183,6 @@ Now that we have a deployed model, let's adjust the 
configurations for the Squid
 * Edit the squid enrichment configuration at 
`$METRON_HOME/config/zookeeper/enrichments/squid.json` (this file will not 
exist, so create a new one) to make the threat triage adjust the level of risk 
based on the model output:
 ```
 {
-  "index": "squid",
-  "batchSize": 1,
   "enrichment" : {
     "fieldMap": {}
   },

http://git-wip-us.apache.org/repos/asf/metron/blob/7f9f86c0/metron-analytics/metron-maas-service/pom.xml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-maas-service/pom.xml 
b/metron-analytics/metron-maas-service/pom.xml
index 0ac9bac..4eeceae 100644
--- a/metron-analytics/metron-maas-service/pom.xml
+++ b/metron-analytics/metron-maas-service/pom.xml
@@ -49,17 +49,6 @@
       <version>${global_kryo_serializers_version}</version>
     </dependency>
     <dependency>
-      <groupId>org.apache.metron</groupId>
-      <artifactId>stellar-common</artifactId>
-      <version>${project.parent.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-auth</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-server-common</artifactId>
       <version>${hadoop.version}</version>

http://git-wip-us.apache.org/repos/asf/metron/blob/7f9f86c0/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/functions/MaaSFunctions.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/functions/MaaSFunctions.java
 
b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/functions/MaaSFunctions.java
deleted file mode 100644
index eacb64d..0000000
--- 
a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/functions/MaaSFunctions.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.maas.functions;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import java.lang.invoke.MethodHandles;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.metron.maas.config.Endpoint;
-import org.apache.metron.maas.config.MaaSConfig;
-import org.apache.metron.maas.config.ModelEndpoint;
-import org.apache.metron.maas.discovery.ServiceDiscoverer;
-import org.apache.metron.maas.util.ConfigUtil;
-import org.apache.metron.maas.util.RESTUtil;
-import org.apache.metron.stellar.common.utils.JSONUtils;
-import org.apache.metron.stellar.dsl.Context;
-import org.apache.metron.stellar.dsl.ParseException;
-import org.apache.metron.stellar.dsl.Stellar;
-import org.apache.metron.stellar.dsl.StellarFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MaaSFunctions {
- protected static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private static class ModelCacheKey {
-    String name;
-    String version;
-    String method;
-    Map<String, String> args;
-    public ModelCacheKey(String name, String version, String method, 
Map<String, String> args) {
-      this.name = name;
-      this.version = version;
-      this.method = method;
-      this.args = args;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) return true;
-      if (o == null || getClass() != o.getClass()) return false;
-
-      ModelCacheKey that = (ModelCacheKey) o;
-
-      if (name != null ? !name.equals(that.name) : that.name != null) return 
false;
-      if (version != null ? !version.equals(that.version) : that.version != 
null) return false;
-      if (method != null ? !method.equals(that.method) : that.method != null) 
return false;
-      return args != null ? args.equals(that.args) : that.args == null;
-
-    }
-
-    @Override
-    public int hashCode() {
-      int result = name != null ? name.hashCode() : 0;
-      result = 31 * result + (version != null ? version.hashCode() : 0);
-      result = 31 * result + (method != null ? method.hashCode() : 0);
-      result = 31 * result + (args != null ? args.hashCode() : 0);
-      return result;
-    }
-  }
-
-  @Stellar(name="MODEL_APPLY"
-          , namespace="MAAS"
-          , description = "Returns the output of a model deployed via Model as 
a Service. NOTE: Results are cached locally for 10 minutes."
-          , params = { "endpoint - A map containing the name, version, and url 
for the REST endpoint"
-                     , "function - The optional endpoint path; default is 
'apply'"
-                     , "model_args - A Dictionary of arguments for the model 
(these become request params)"
-                     }
-          , returns = "The output of the model deployed as a REST endpoint in 
Map form.  Assumes REST endpoint returns a JSON Map."
-          )
-  public static class ModelApply implements StellarFunction {
-    private boolean isInitialized = false;
-    private ServiceDiscoverer discoverer;
-    private Cache<ModelCacheKey, Map<String, Object> > resultCache;
-    public ModelApply() {
-      resultCache = CacheBuilder.newBuilder()
-                            .concurrencyLevel(4)
-                            .weakKeys()
-                            .maximumSize(100000)
-                            .expireAfterWrite(10, TimeUnit.MINUTES)
-                            .build();
-    }
-
-    @Override
-    public Object apply(List<Object> args, Context context) throws 
ParseException {
-      if(args.size() < 2) {
-        throw new ParseException("Unable to execute model_apply. " +
-                                 "Expected arguments: endpoint_map:map, " +
-                                 " [endpoint method:string], model_args:map"
-                                 );
-      }
-      if(!isInitialized) {
-        return null;
-      }
-      int i = 0;
-      if(args.size() == 0) {
-        return null;
-      }
-      Object endpointObj = args.get(i++);
-      Map endpoint = null;
-      String modelName;
-      String modelVersion;
-      String modelUrl;
-      if(endpointObj instanceof Map) {
-        endpoint = (Map)endpointObj;
-        modelName = endpoint.get("name") + "";
-        modelVersion = endpoint.get("version") + "";
-        modelUrl = endpoint.get("url") + "";
-      }
-      else {
-        return null;
-      }
-      String modelFunction = "apply";
-      Map<String, String> modelArgs = new HashMap<>();
-      if(args.get(i) instanceof String) {
-        String func = (String)args.get(i);
-        if(endpoint.containsKey("endpoint:" + func)) {
-          modelFunction = "" + endpoint.get("endpoint:" + func);
-        }
-        else {
-          modelFunction = func;
-        }
-        i++;
-      }
-
-      if(args.get(i) instanceof Map) {
-        if(endpoint.containsKey("endpoint:apply")) {
-          modelFunction = "" + endpoint.get("endpoint:apply");
-        }
-        modelArgs = (Map)args.get(i);
-      }
-      if( modelName == null
-       || modelVersion == null
-       || modelFunction == null
-        ) {
-        return null;
-      }
-      ModelCacheKey cacheKey = new ModelCacheKey(modelName, modelVersion, 
modelFunction, modelArgs);
-      Map<String, Object> ret = resultCache.getIfPresent(cacheKey);
-      if(ret != null) {
-        return ret;
-      }
-      else {
-        String url = modelUrl;
-        if (url.endsWith("/")) {
-          url = url.substring(0, url.length() - 1);
-        }
-        if (modelFunction.startsWith("/")) {
-          modelFunction = modelFunction.substring(1);
-        }
-        try {
-          URL u = new URL(url + "/" + modelFunction);
-
-          String results = RESTUtil.INSTANCE.getRESTJSONResults(u, modelArgs);
-          ret = JSONUtils.INSTANCE.load(results, new TypeReference<Map<String, 
Object>>() {
-          });
-          resultCache.put(cacheKey, ret);
-          return ret;
-        } catch (Exception e) {
-          LOG.error(e.getMessage(), e);
-          if (discoverer != null) {
-            try {
-              URL u = new URL(modelUrl);
-              discoverer.blacklist(u);
-            } catch (MalformedURLException e1) {
-            }
-          }
-        }
-      }
-      return null;
-    }
-
-    @Override
-    public synchronized void initialize(Context context) {
-
-      try {
-        Optional<ServiceDiscoverer> discovererOpt = (Optional) 
(context.getCapability(Context.Capabilities.SERVICE_DISCOVERER));
-        if (discovererOpt.isPresent()) {
-          discoverer = discovererOpt.get();
-        }
-        else {
-          Optional<Object> clientOptional = 
context.getCapability(Context.Capabilities.ZOOKEEPER_CLIENT);
-          CuratorFramework client = null;
-          if (clientOptional.isPresent() && clientOptional.get() instanceof 
CuratorFramework) {
-            client = (CuratorFramework) clientOptional.get();
-          } else {
-            throw new IllegalStateException("Unable to initialize function: 
Cannot find zookeeper client.");
-          }
-          discoverer = createDiscoverer(client);
-        }
-      }
-      catch(Exception ex) {
-        LOG.error(ex.getMessage(), ex);
-      }
-      finally {
-        //We always want to set initialize to true because we don't want to 
keep trying to initialize over and over
-        isInitialized = true;
-      }
-    }
-
-    @Override
-    public boolean isInitialized() {
-      return isInitialized;
-    }
-  }
-
-  private static ServiceDiscoverer createDiscoverer(CuratorFramework client) 
throws Exception {
-    MaaSConfig config = ConfigUtil.INSTANCE.read(client, 
"/metron/maas/config", new MaaSConfig(), MaaSConfig.class);
-    ServiceDiscoverer discoverer = new ServiceDiscoverer(client, 
config.getServiceRoot());
-    discoverer.start();
-    return discoverer;
-  }
-
-  @Stellar(name="GET_ENDPOINT"
-          , namespace="MAAS"
-          , description="Inspects ZooKeeper and returns a map containing the 
name, version and url for the model referred to by the input parameters."
-          , params = {
-                      "model_name - The name of the model"
-                     ,"model_version - The optional version of the model.  If 
the model version is not specified, the most current version is used."
-                     }
-          , returns = "A map containing the name, version, and url for the 
REST endpoint (fields named name, version and url).  " +
-                      "Note that the output of this function is suitable for 
input into the first argument of MAAS_MODEL_APPLY."
-          )
-  public static class GetEndpoint implements StellarFunction {
-    ServiceDiscoverer discoverer;
-    private boolean isInitialized = false;
-    private boolean isValidState = false;
-
-    @Override
-    public Object apply(List<Object> args, Context context) throws 
ParseException {
-      if(!isValidState) {
-        LOG.error("Invalid state: Unable to find ServiceDiscoverer service.");
-        return null;
-      }
-      String modelName = null;
-      String modelVersion = null;
-      if(args.size() >= 1) {
-        modelName = args.get(0).toString();
-      }
-      if(args.size() >= 2)
-      {
-        modelVersion = args.get(1).toString();
-      }
-      if(modelName == null) {
-        return null;
-      }
-      try {
-        ModelEndpoint ep = null;
-        if (modelVersion == null) {
-          ep = discoverer.getEndpoint(modelName);
-        } else {
-          ep = discoverer.getEndpoint(modelName, modelVersion);
-        }
-        return ep == null ? null : endpointToMap(ep.getName(), 
ep.getVersion(), ep.getEndpoint());
-      }
-      catch(Exception ex) {
-        LOG.error("Unable to discover endpoint: {}", ex.getMessage(), ex);
-        return null;
-      }
-    }
-
-    public static Map<String, String> endpointToMap(String name, String 
version, Endpoint ep) {
-      Map<String, String> ret = new HashMap<>();
-      ret.put("url", ep.getUrl());
-      ret.put("name", name);
-      ret.put("version", version);
-      for(Map.Entry<String, String> kv : ep.getFunctions().entrySet()) {
-        ret.put("endpoint:" + kv.getKey(), kv.getValue());
-      }
-      return ret;
-    }
-
-    @Override
-    public synchronized void initialize(Context context) {
-      try {
-        Optional<Object> clientOptional = 
context.getCapability(Context.Capabilities.ZOOKEEPER_CLIENT);
-        CuratorFramework client = null;
-        if (clientOptional.isPresent() && clientOptional.get() instanceof 
CuratorFramework) {
-          client = (CuratorFramework) clientOptional.get();
-        } else {
-          throw new IllegalStateException("Unable to initialize function: 
Cannot find zookeeper client.");
-        }
-        try {
-          discoverer = createDiscoverer(client);
-          context.addCapability(Context.Capabilities.SERVICE_DISCOVERER, () -> 
discoverer);
-          isValidState = true;
-        } catch (Exception e) {
-          LOG.error(e.getMessage(), e);
-          throw new IllegalStateException("Unable to initialize 
MAAS_GET_ENDPOINT", e);
-        }
-      }
-      finally {
-        isInitialized = true;
-      }
-    }
-
-    @Override
-    public boolean isInitialized() {
-      return isInitialized;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/7f9f86c0/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/Client.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/Client.java
 
b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/Client.java
index c2d8906..9cabf21 100644
--- 
a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/Client.java
+++ 
b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/Client.java
@@ -24,6 +24,7 @@ import java.util.*;
 import java.util.function.Function;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import org.apache.commons.cli.*;
 import org.apache.commons.cli.CommandLine;
@@ -558,6 +559,7 @@ public class Client {
     // Copy the application master jar to the filesystem
     // Create a local resource to point to the destination jar path
     FileSystem fs = FileSystem.get(conf);
+    createMaaSDirectory(fs, appId.toString());
     Path ajPath = addToLocalResources(fs, appMasterJar, appMasterJarPath, 
appId.toString(), localResources, null);
 
     // Set the log4j properties if needed
@@ -789,6 +791,18 @@ public class Client {
     yarnClient.killApplication(appId);
   }
 
+  private void createMaaSDirectory(FileSystem fs, String appId) throws 
IOException {
+    for(Path p : ImmutableList.of(new Path(fs.getHomeDirectory(), appName)
+                                 , new Path(fs.getHomeDirectory(), appName + 
"/" + appId)
+                                 )
+       ) {
+      if(!fs.exists(p)) {
+        fs.mkdirs(p);
+        fs.setPermission(p, new FsPermission((short)0755));
+      }
+    }
+  }
+
   private Path addToLocalResources(FileSystem fs, String fileSrcPath,
                                    String fileDstPath, String appId, 
Map<String, LocalResource> localResources,
                                    String resources) throws IOException {
@@ -808,6 +822,7 @@ public class Client {
     } else {
       fs.copyFromLocalFile(new Path(fileSrcPath), dst);
     }
+    fs.setPermission(dst, new FsPermission((short)0755));
     FileStatus scFileStatus = fs.getFileStatus(dst);
     LocalResource scRsrc =
             LocalResource.newInstance(

http://git-wip-us.apache.org/repos/asf/metron/blob/7f9f86c0/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/Constants.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/Constants.java
 
b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/Constants.java
index ac2c950..d032511 100644
--- 
a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/Constants.java
+++ 
b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/Constants.java
@@ -31,5 +31,8 @@ public class Constants {
    * Environment key name denoting the timeline domain ID.
    */
   public static final String TIMELINEDOMAIN = "TIMELINEDOMAIN";
-
+  /*
+  The filename which communicates the endpoint information for a deployed model
+   */
+  public static final String ENDPOINT_DAT = "endpoint.dat";
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/7f9f86c0/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/runner/Runner.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/runner/Runner.java
 
b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/runner/Runner.java
index cc297d2..8f0b9e5 100644
--- 
a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/runner/Runner.java
+++ 
b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/runner/Runner.java
@@ -33,6 +33,7 @@ import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.discovery.*;
 import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
 import org.apache.metron.maas.config.Endpoint;
+import org.apache.metron.maas.service.Constants;
 import org.apache.metron.maas.util.ConfigUtil;
 import org.apache.metron.maas.config.MaaSConfig;
 import org.apache.metron.maas.config.ModelEndpoint;
@@ -202,7 +203,11 @@ public class Runner {
       serviceDiscovery.start();
 
       File cwd = new File(script).getParentFile();
-      final String cmd = new File(cwd, script).getAbsolutePath();
+      File scriptFile = new File(cwd, script);
+      if(scriptFile.exists() && !scriptFile.canExecute()) {
+        scriptFile.setExecutable(true);
+      }
+      final String cmd = scriptFile.getAbsolutePath();
         try {
           p = new ProcessBuilder(cmd).directory(cwd).start();
 
@@ -299,7 +304,7 @@ public class Runner {
 
   private static Endpoint readEndpoint(File cwd) throws Exception {
     String content = "";
-    File f = new File(cwd, "endpoint.dat");
+    File f = new File(cwd, Constants.ENDPOINT_DAT);
     for(int i = 0;i < NUM_ATTEMPTS;i++) {
       if(f.exists()) {
         try {

http://git-wip-us.apache.org/repos/asf/metron/blob/7f9f86c0/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/submit/ModelSubmission.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/submit/ModelSubmission.java
 
b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/submit/ModelSubmission.java
index ebfa904..fcae40a 100644
--- 
a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/submit/ModelSubmission.java
+++ 
b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/submit/ModelSubmission.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.metron.maas.config.*;
 import org.apache.metron.maas.discovery.ServiceDiscoverer;
+import org.apache.metron.maas.service.Constants;
 import org.apache.metron.maas.service.Log4jPropertyHelper;
 import org.apache.metron.maas.util.ConfigUtil;
 import org.apache.metron.maas.queue.Queue;
@@ -247,6 +248,10 @@ public class ModelSubmission {
         fs.mkdirs(hdfsPath);
       }
       for(File f : localDir.listFiles()) {
+        if(f.getName().equals(Constants.ENDPOINT_DAT)) {
+          //skip the endpoint if it exists accidentally, we don't want to 
localize that.
+          continue;
+        }
         Path p = new Path(hdfsPath, f.getName());
         FSDataOutputStream out = fs.create(p);
         BufferedInputStream in = new BufferedInputStream(new 
FileInputStream(f));

Reply via email to