METRON-1850 Stellar REST function (merrimanr) closes apache/metron#1250

Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/3e73391e
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/3e73391e
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/3e73391e

Branch: refs/heads/feature/METRON-1090-stellar-assignment
Commit: 3e73391ed518e6d4e2f87a37745a34581782236b
Parents: b5712af
Author: merrimanr <merrim...@gmail.com>
Authored: Thu Nov 8 16:44:32 2018 -0600
Committer: rmerriman <merrim...@gmail.com>
Committed: Thu Nov 8 16:44:32 2018 -0600

----------------------------------------------------------------------
 metron-analytics/metron-maas-common/pom.xml     |   2 +-
 .../common/bolt/ConfiguredEnrichmentBolt.java   |  14 +
 .../common/bolt/ConfiguredParserBolt.java       |  13 +
 metron-platform/metron-data-management/pom.xml  |   5 +-
 .../enrichment/bolt/GenericEnrichmentBolt.java  |   1 +
 .../enrichment/bolt/ThreatIntelJoinBolt.java    |   6 -
 metron-stellar/stellar-common/README.md         |  98 ++-
 metron-stellar/stellar-common/pom.xml           |  40 ++
 .../stellar/common/shell/cli/StellarShell.java  |   2 +
 .../org/apache/metron/stellar/dsl/Context.java  |   5 +
 .../stellar/dsl/functions/RestConfig.java       | 166 +++++
 .../stellar/dsl/functions/RestFunctions.java    | 388 ++++++++++++
 .../dsl/functions/RestFunctionsTest.java        | 604 +++++++++++++++++++
 pom.xml                                         |   1 +
 14 files changed, 1334 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/3e73391e/metron-analytics/metron-maas-common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-maas-common/pom.xml 
b/metron-analytics/metron-maas-common/pom.xml
index b5cf1c9..062e3db 100644
--- a/metron-analytics/metron-maas-common/pom.xml
+++ b/metron-analytics/metron-maas-common/pom.xml
@@ -62,7 +62,7 @@
     <dependency>
       <groupId>org.apache.httpcomponents</groupId>
       <artifactId>httpclient</artifactId>
-      <version>4.3.2</version>
+      <version>${global_httpclient_version}</version>
     </dependency>
     <dependency>
       <groupId>org.codehaus.jackson</groupId>

http://git-wip-us.apache.org/repos/asf/metron/blob/3e73391e/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
index c28ca7b..2e03a36 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
@@ -17,8 +17,10 @@
  */
 package org.apache.metron.common.bolt;
 
+import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.metron.stellar.dsl.StellarFunctions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,4 +33,16 @@ public abstract class ConfiguredEnrichmentBolt extends 
ConfiguredBolt<Enrichment
     super(zookeeperUrl, "ENRICHMENT");
   }
 
+  @Override
+  public void cleanup() {
+    // This method may not be called in production.
+    // See 
https://storm.apache.org/releases/1.0.6/javadocs/org/apache/storm/task/IBolt.html#cleanup--
 for more detail.
+    super.cleanup();
+    try {
+      StellarFunctions.close();
+    } catch (IOException e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/3e73391e/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
index 14ce50b..17b614b 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
@@ -17,9 +17,11 @@
  */
 package org.apache.metron.common.bolt;
 
+import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.stellar.dsl.StellarFunctions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,4 +38,15 @@ public abstract class ConfiguredParserBolt extends 
ConfiguredBolt<ParserConfigur
     return getConfigurations().getSensorParserConfig(sensorType);
   }
 
+  @Override
+  public void cleanup() {
+    // This method may not be called in production.
+    // See 
https://storm.apache.org/releases/1.0.6/javadocs/org/apache/storm/task/IBolt.html#cleanup--
 for more detail.
+    super.cleanup();
+    try {
+      StellarFunctions.close();
+    } catch (IOException e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/3e73391e/metron-platform/metron-data-management/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/pom.xml 
b/metron-platform/metron-data-management/pom.xml
index c4dbb74..a7a5a40 100644
--- a/metron-platform/metron-data-management/pom.xml
+++ b/metron-platform/metron-data-management/pom.xml
@@ -26,7 +26,6 @@
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-        <httpcore.version>4.3.2</httpcore.version>
         <lucene.test.version>5.5.0</lucene.test.version>
     </properties>
 
@@ -221,12 +220,12 @@
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpcore</artifactId>
-            <version>${httpcore.version}</version>
+            <version>${global_httpclient_version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
-            <version>${httpcore.version}</version>
+            <version>${global_httpclient_version}</version>
         </dependency>
         <dependency>
             <groupId>org.hamcrest</groupId>

http://git-wip-us.apache.org/repos/asf/metron/blob/3e73391e/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
 
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
index 0677453..e0f1f0c 100644
--- 
a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
+++ 
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -271,6 +271,7 @@ public class GenericEnrichmentBolt extends 
ConfiguredEnrichmentBolt {
 
   @Override
   public void cleanup() {
+    super.cleanup();
     adapter.cleanup();
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/3e73391e/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
 
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
index cfa101d..fcfa918 100644
--- 
a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
+++ 
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
@@ -17,16 +17,12 @@
  */
 package org.apache.metron.enrichment.bolt;
 
-import com.google.common.base.Joiner;
 import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.metron.common.configuration.ConfigurationType;
 import 
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
 import org.apache.metron.common.configuration.enrichment.handler.ConfigHandler;
-import org.apache.metron.common.configuration.enrichment.threatintel.RuleScore;
-import 
org.apache.metron.common.configuration.enrichment.threatintel.ThreatScore;
-import 
org.apache.metron.common.configuration.enrichment.threatintel.ThreatTriageConfig;
 import org.apache.metron.common.message.MessageGetStrategy;
 import org.apache.metron.common.utils.MessageUtils;
 import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase;
@@ -34,8 +30,6 @@ import org.apache.metron.enrichment.utils.ThreatIntelUtils;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.StellarFunctions;
 import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
-import org.apache.metron.stellar.common.utils.ConversionUtils;
-import org.apache.metron.threatintel.triage.ThreatTriageProcessor;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;

http://git-wip-us.apache.org/repos/asf/metron/blob/3e73391e/metron-stellar/stellar-common/README.md
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/README.md 
b/metron-stellar/stellar-common/README.md
index 0f3bb6a..5b0ec17 100644
--- a/metron-stellar/stellar-common/README.md
+++ b/metron-stellar/stellar-common/README.md
@@ -31,6 +31,12 @@ For a variety of components (threat intelligence triage and 
field transformation
     * [Advanced Usage](#advanced-usage)
     * [Implementation](#implementation)
 * [Stellar Configuration](#stellar-configuration)
+* [Stellar REST Client](#stellar-rest-client)
+    * [Configuration](#configuration)
+    * [Security](#security)
+    * [Examples](#examples)
+    * [Latency](#latency)
+    * [Response Handling](#response-handling)
 
 
 ## Introduction
@@ -248,7 +254,8 @@ Where:
 | [ `REDUCE`](#reduce)                                                         
                      |
 | [ `REGEXP_MATCH`](#regexp_match)                                             
                      |
 | [ `REGEXP_GROUP_VAL`](#regexp_group_val)                                     
                      |
-| [ `REGEXP_REPLACE`](#regexp_replace)                                         
                      |
+| [ `REGEXP_REPLACE`](#regexp_replace)
+| [ `REST_GET`](#rest_get)
 | [ `ROUND`](#round)                                                           
                      |
 | [ `SAMPLE_ADD`](../../metron-analytics/metron-statistics#sample_add)         
                      |
 | [ `SAMPLE_GET`](../../metron-analytics/metron-statistics#sample_get)         
                      |
@@ -916,6 +923,13 @@ Where:
     * pattern - The proposed regex pattern
     * value - The value to replace the regex pattern
   * Returns: The modified input string with replaced values
+  
+### `REST_GET`
+  * Description: Performs a REST GET request and parses the JSON results into 
a map.
+  * Input:
+    * url - URI to the REST service
+    * rest_config - Optional - Map (in curly braces) of name:value pairs, each 
overriding the global config parameter of the same name. Default is the empty 
Map, meaning no overrides.
+  * Returns: JSON results as a Map
 
 ### `ROUND`
   * Description: Rounds a number to the nearest integer.  This is half-up 
rounding.
@@ -1599,3 +1613,85 @@ that specify what should be included when searching for 
Stellar functions.
 }
 ```
 
+## Stellar REST Client
+
+Stellar provides a REST Client with the `REST_GET` function.  This function 
depends on the Apache HttComponents library for
+executing Http requests.  The syntax is:
+```
+REST_GET( uri , optional config )
+```
+
+### Configuration
+
+The second argument is an optional Map of settings.  The following settings 
are available:
+
+* basic.auth.user - User name for basic authentication.
+* basic.auth.password.path - Path to the basic authentication password file 
stored in HDFS.
+* proxy.host - Proxy host.
+* proxy.port - Proxy port.
+* proxy.basic.auth.user - User name for proxy basic authentication.
+* proxy.basic.auth.password.path - Path to the proxy basic authentication 
password file stored in HDFS.
+* timeout - Stellar enforced hard timeout for the total request time. Defaults 
to 1000 ms.  HttpClient timeouts alone are insufficient to guarantee the hard 
timeout.
+* connect.timeout - Connect timeout exposed by the HttpClient object.
+* connection.request.timeout - Connection request timeout exposed by the 
HttpClient object.
+* socket.timeout - Socket timeout exposed by the HttpClient object.
+* response.codes.allowed - A list of response codes that are allowed.  All 
others will be treated as errors.  Defaults to `200`.
+* empty.content.override - The default value that will be returned on a 
successful request with empty content.  Defaults to null.
+* error.value.override - The default value that will be returned on an error.  
Defaults to null.
+* pooling.max.total - The maximum number of connections in the connection pool.
+* pooling.default.max.per.route - The default maximum number of connections 
per route in the connection pool.
+
+This Map of settings can also be stored in the global config 
`stellar.rest.settings` property.  For example, to configure basic 
authentication
+settings you would add this property to the global config:
+
+```
+{
+  "stellar.rest.settings": {
+    "basic.auth.user": "user",
+    "basic.auth.password.path": "/password/path"
+  }
+}
+```
+
+Any settings passed into the expression will take precedence over the global 
config settings.  The global config settings will take precedence over the 
defaults.
+
+For security purposes, passwords are read from a file in HDFS.  Passwords are 
read as is including any new lines or spaces. Be careful not to include these 
in the file unless they are specifically part of the password.
+
+### Security
+
+At this time, only basic authentication is supported.  
+
+### Examples
+
+Perform a simple GET request with no authentication:
+```
+[Stellar]>>> REST_GET('http://httpbin.org/get')
+{args={}, headers={Accept=application/json, Accept-Encoding=gzip,deflate, 
Cache-Control=max-age=259200, Connection=close, Host=httpbin.org, 
User-Agent=Apache-HttpClient/4.3.2 (java 1.5)}, origin=127.0.0.1, 
136.62.241.236, url=http://httpbin.org/get}
+```
+
+Perform a GET request using basic authentication:
+```
+[Stellar]>>> config := {'basic.auth.user': 'user', 'basic.auth.password.path': 
'/password/path'}
+{basic.auth.user=user, basic.auth.password.path=/password/path}
+[Stellar]>>> REST_GET('http://httpbin.org/basic-auth/user/passwd', config)
+{authenticated=true, user=user}
+```
+
+Perform a GET request using a proxy:
+```
+[Stellar]>>> config := {'proxy.host': 'node1', 'proxy.port': 3128, 
'proxy.basic.auth.user': 'user', 'proxy.basic.auth.password.path': 
'/proxy/password/path'}
+{proxy.basic.auth.password.path=/proxy/password/path, proxy.port=3128, 
proxy.host=node1, proxy.basic.auth.user=user}
+[Stellar]>>> REST_GET('http://httpbin.org/get', config)
+{args={}, headers={Accept=application/json, Accept-Encoding=gzip,deflate, 
Cache-Control=max-age=259200, Connection=close, Host=httpbin.org, 
User-Agent=Apache-HttpClient/4.3.2 (java 1.5)}, origin=127.0.0.1, 
136.62.241.236, url=http://httpbin.org/get}
+```
+
+### Latency
+
+Performing a REST request will introduce latency in a streaming pipeline.  
Therefore this function should only be used for low volume telemetries that are 
unlikely to be
+affected by higher latency operations.  The `timeout` setting can be used to 
guarantee that requests complete within the configured time.
+
+### Response Handling
+
+In cases of Http errors, timeouts, etc this function will log the error and 
return null.  Only a status code of `200` is considered successful
+by default but this can be changed with the `response.codes.allowed` setting.  
Values returned on errors or emtpy content can be changed from 
+the default value of null using the `error.value.override` and 
`empty.content.override` respectively.

http://git-wip-us.apache.org/repos/asf/metron/blob/3e73391e/metron-stellar/stellar-common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/pom.xml 
b/metron-stellar/stellar-common/pom.xml
index 42e649d..b8be521 100644
--- a/metron-stellar/stellar-common/pom.xml
+++ b/metron-stellar/stellar-common/pom.xml
@@ -35,6 +35,17 @@
           <version>${global_caffeine_version}</version>
         </dependency>
         <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+          <version>${global_hadoop_version}</version>
+          <exclusions>
+            <exclusion>
+              <groupId>asm</groupId>
+              <artifactId>asm</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-auth</artifactId>
             <version>${global_hadoop_version}</version>
@@ -213,12 +224,41 @@
             <version>1.0.2</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>${global_httpclient_version}</version>
+        </dependency>
+        <dependency>
             <!-- junit dependency added with default scope to allow inclusion 
of StellarProcessorUtils in main jar.
                  It is excluded from the uber-jar. -->
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <version>${global_junit_version}</version>
         </dependency>
+        <dependency>
+          <groupId>org.mock-server</groupId>
+          <artifactId>mockserver-netty</artifactId>
+          <version>3.10.8</version>
+          <scope>test</scope>
+          <exclusions>
+            <exclusion>
+              <groupId>ch.qos.logback</groupId>
+              <artifactId>logback-classic</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+        <dependency>
+          <groupId>org.mock-server</groupId>
+          <artifactId>mockserver-client-java</artifactId>
+          <version>3.10.8</version>
+          <scope>test</scope>
+          <exclusions>
+            <exclusion>
+              <groupId>ch.qos.logback</groupId>
+              <artifactId>logback-classic</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
     </dependencies>
     <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/metron/blob/3e73391e/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/cli/StellarShell.java
----------------------------------------------------------------------
diff --git 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/cli/StellarShell.java
 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/cli/StellarShell.java
index c81df61..083bad2 100644
--- 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/cli/StellarShell.java
+++ 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/cli/StellarShell.java
@@ -36,6 +36,7 @@ import 
org.apache.metron.stellar.common.shell.StellarAutoCompleter;
 import org.apache.metron.stellar.common.shell.StellarResult;
 import org.apache.metron.stellar.common.shell.StellarShellExecutor;
 import org.apache.metron.stellar.common.utils.JSONUtils;
+import org.apache.metron.stellar.dsl.StellarFunctions;
 import org.jboss.aesh.complete.CompleteOperation;
 import org.jboss.aesh.complete.Completion;
 import org.jboss.aesh.console.AeshConsoleCallback;
@@ -337,6 +338,7 @@ public class StellarShell extends AeshConsoleCallback 
implements Completion {
   private void handleQuit() {
     try {
       console.stop();
+      StellarFunctions.close();
     } catch (Throwable e) {
       e.printStackTrace();
     }

http://git-wip-us.apache.org/repos/asf/metron/blob/3e73391e/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/Context.java
----------------------------------------------------------------------
diff --git 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/Context.java
 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/Context.java
index 0f04151..d31580b 100644
--- 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/Context.java
+++ 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/Context.java
@@ -67,6 +67,11 @@ public class Context implements Serializable {
      * is in the cache, then the cached result will be returned instead of 
recomputing.
      */
     CACHE
+    ,
+    /**
+     * This capability indicates that a http client (i.e. a 
CloseableHttpClient, specifically) is available.
+     */
+    HTTP_CLIENT
   }
 
   public enum ActivityType {

http://git-wip-us.apache.org/repos/asf/metron/blob/3e73391e/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestConfig.java
----------------------------------------------------------------------
diff --git 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestConfig.java
 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestConfig.java
new file mode 100644
index 0000000..62d89f7
--- /dev/null
+++ 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestConfig.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.stellar.dsl.functions;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * A Map containing the Stellar REST settings.
+ */
+public class RestConfig extends HashMap<String, Object> {
+
+  /**
+   * A global config prefix used for storing Stellar REST settings.
+   */
+  public final static String STELLAR_REST_SETTINGS = "stellar.rest.settings";
+
+  /**
+   * User name for basic authentication.
+   */
+  public final static String BASIC_AUTH_USER = "basic.auth.user";
+
+  /**
+   * Path to the basic authentication password file stored in HDFS.
+   */
+  public final static String BASIC_AUTH_PASSWORD_PATH = 
"basic.auth.password.path";
+
+  /**
+   * Proxy host.
+   */
+  public final static String PROXY_HOST = "proxy.host";
+
+  /**
+   * Proxy port.
+   */
+  public final static String PROXY_PORT = "proxy.port";
+
+  /**
+   * User name for proxy basic authentication.
+   */
+  public final static String PROXY_BASIC_AUTH_USER = "proxy.basic.auth.user";
+
+  /**
+   * Path to the proxy basic authentication password file stored in HDFS.
+   */
+  public final static String PROXY_BASIC_AUTH_PASSWORD_PATH = 
"proxy.basic.auth.password.path";
+
+  /**
+   * Hard timeout for the total request time.
+   */
+  public final static String TIMEOUT = "timeout";
+
+  /**
+   * Timeouts exposed by the HttpClient object.
+   */
+  public final static String CONNECT_TIMEOUT = "connect.timeout";
+  public final static String CONNECTION_REQUEST_TIMEOUT = 
"connection.request.timeout";
+  public final static String SOCKET_TIMEOUT = "socket.timeout";
+
+  /**
+   * A list of response codes that are allowed.  All others will be treated as 
errors.
+   */
+  public final static String RESPONSE_CODES_ALLOWED = "response.codes.allowed";
+
+  /**
+   * The default value that will be returned on a successful request with 
empty content.  Default is null.
+   */
+  public final static String EMPTY_CONTENT_OVERRIDE = "empty.content.override";
+
+  /**
+   * The default value that will be returned on an error.  Default is null.
+   */
+  public final static String ERROR_VALUE_OVERRIDE = "error.value.override";
+
+  /**
+   * The maximum number of connections in the connection pool.
+   */
+  public final static String POOLING_MAX_TOTAL = "pooling.max.total";
+
+  /**
+   * The default maximum number of connections per route in the connection 
pool.
+   */
+  public final static String POOLING_DEFAULT_MAX_PER_RUOTE = 
"pooling.default.max.per.route";
+
+
+  public RestConfig() {
+    put(TIMEOUT, 1000);
+    put(RESPONSE_CODES_ALLOWED, Collections.singletonList(200));
+  }
+
+  public String getBasicAuthUser() {
+    return (String) get(BASIC_AUTH_USER);
+  }
+
+  public String getBasicAuthPasswordPath() {
+    return (String) get(BASIC_AUTH_PASSWORD_PATH);
+  }
+
+  public String getProxyHost() {
+    return (String) get(PROXY_HOST);
+  }
+
+  public Integer getProxyPort() {
+    return (Integer) get(PROXY_PORT);
+  }
+
+  public String getProxyBasicAuthUser() {
+    return (String) get(PROXY_BASIC_AUTH_USER);
+  }
+
+  public String getProxyBasicAuthPasswordPath() {
+    return (String) get(PROXY_BASIC_AUTH_PASSWORD_PATH);
+  }
+
+  public Integer getTimeout() {
+    return (Integer) get(TIMEOUT);
+  }
+
+  public Integer getConnectTimeout() {
+    return (Integer) get(CONNECT_TIMEOUT);
+  }
+
+  public Integer getConnectionRequestTimeout() {
+    return (Integer) get(CONNECTION_REQUEST_TIMEOUT);
+  }
+
+  public Integer getSocketTimeout() {
+    return (Integer) get(SOCKET_TIMEOUT);
+  }
+
+  public List<Integer> getResponseCodesAllowed() {
+    return (List<Integer>) get(RESPONSE_CODES_ALLOWED);
+  }
+
+  public Object getEmptyContentOverride() {
+    return get(EMPTY_CONTENT_OVERRIDE);
+  }
+
+  public Object getErrorValueOverride() {
+    return get(ERROR_VALUE_OVERRIDE);
+  }
+
+  public Integer getPoolingMaxTotal() {
+    return (Integer) get(POOLING_MAX_TOTAL);
+  }
+
+  public Integer getPoolingDefaultMaxPerRoute() {
+    return (Integer) get(POOLING_DEFAULT_MAX_PER_RUOTE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3e73391e/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestFunctions.java
----------------------------------------------------------------------
diff --git 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestFunctions.java
 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestFunctions.java
new file mode 100644
index 0000000..7134bfc
--- /dev/null
+++ 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestFunctions.java
@@ -0,0 +1,388 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.stellar.dsl.functions;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.util.EntityUtils;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.apache.metron.stellar.common.utils.JSONUtils;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.ParseException;
+import org.apache.metron.stellar.dsl.Stellar;
+import org.apache.metron.stellar.dsl.StellarFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.String.format;
+import static org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG;
+import static 
org.apache.metron.stellar.dsl.functions.RestConfig.POOLING_DEFAULT_MAX_PER_RUOTE;
+import static 
org.apache.metron.stellar.dsl.functions.RestConfig.POOLING_MAX_TOTAL;
+import static 
org.apache.metron.stellar.dsl.functions.RestConfig.STELLAR_REST_SETTINGS;
+
+/**
+ * Defines functions that enable REST requests with proper result and error 
handling.  Depends on an
+ * Apache HttpComponents client being supplied as a Stellar HTTP_CLIENT 
capability.  Exposes various Http settings
+ * including authentication, proxy and timeouts through the global config with 
the option to override any settings
+ * through a config object supplied in the expression.
+ */
+public class RestFunctions {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * Get an argument from a list of arguments.
+   *
+   * @param index The index within the list of arguments.
+   * @param clazz The type expected.
+   * @param args All of the arguments.
+   * @param <T> The type of the argument expected.
+   */
+  public static <T> T getArg(int index, Class<T> clazz, List<Object> args) {
+
+    if(index >= args.size()) {
+      throw new IllegalArgumentException(format("Expected at least %d 
argument(s), found %d", index+1, args.size()));
+    }
+
+    return ConversionUtils.convert(args.get(index), clazz);
+  }
+
+  @Stellar(
+          namespace = "REST",
+          name = "GET",
+          description = "Performs a REST GET request and parses the JSON 
results into a map.",
+          params = {
+                  "url - URI to the REST service",
+                  "rest_config - Optional - Map (in curly braces) of 
name:value pairs, each overriding the global config parameter " +
+                          "of the same name. Default is the empty Map, meaning 
no overrides."
+          },
+          returns = "JSON results as a Map")
+  public static class RestGet implements StellarFunction {
+
+    /**
+     * Whether the function has been initialized.
+     */
+    private boolean initialized = false;
+
+    /**
+     * The CloseableHttpClient.
+     */
+    private CloseableHttpClient httpClient;
+
+    /**
+     * Executor used to impose a hard request timeout.
+     */
+    private ScheduledExecutorService scheduledExecutorService;
+
+    /**
+     * Initialize the function by creating a ScheduledExecutorService and 
looking up the CloseableHttpClient from the
+     * Stellar context.
+     * @param context
+     */
+    @Override
+    public void initialize(Context context) {
+      scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+      httpClient = getHttpClient(context);
+      initialized = true;
+    }
+
+    @Override
+    public boolean isInitialized() {
+      return initialized;
+    }
+
+    /**
+     * Apply the function.
+     * @param args The function arguments including uri and rest config.
+     * @param context Stellar context
+     */
+    @Override
+    public Object apply(List<Object> args, Context context) throws 
ParseException {
+      RestConfig restConfig = new RestConfig();
+      try {
+        URI uri = new URI(getArg(0, String.class, args));
+        restConfig = getRestConfig(args, getGlobalConfig(context));
+
+        HttpHost target = new HttpHost(uri.getHost(), uri.getPort(), 
uri.getScheme());
+        Optional<HttpHost> proxy = getProxy(restConfig);
+        HttpClientContext httpClientContext = getHttpClientContext(restConfig, 
target, proxy);
+
+        HttpGet httpGet = new HttpGet(uri);
+        httpGet.addHeader("Accept", "application/json");
+        httpGet.setConfig(getRequestConfig(restConfig, proxy));
+
+        return doGet(restConfig, httpGet, httpClientContext);
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage(), e);
+      } catch (IOException e) {
+        LOG.error(e.getMessage(), e);
+        return restConfig.getErrorValueOverride();
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (httpClient != null) {
+        httpClient.close();
+      }
+      if (scheduledExecutorService != null) {
+        scheduledExecutorService.shutdown();
+      }
+    }
+
+    /**
+     * Retrieves the ClosableHttpClient from a pooling connection manager.
+     *
+     * @param context The execution context.
+     * @return A ClosableHttpClient.
+     */
+    protected CloseableHttpClient getHttpClient(Context context) {
+      RestConfig restConfig = getRestConfig(Collections.emptyList(), 
getGlobalConfig(context));
+
+      PoolingHttpClientConnectionManager cm = getConnectionManager(restConfig);
+
+      return HttpClients.custom()
+              .setConnectionManager(cm)
+              .build();
+    }
+
+    protected PoolingHttpClientConnectionManager 
getConnectionManager(RestConfig restConfig) {
+      PoolingHttpClientConnectionManager cm = new 
PoolingHttpClientConnectionManager();
+      if (restConfig.containsKey(POOLING_MAX_TOTAL)) {
+        cm.setMaxTotal(restConfig.getPoolingMaxTotal());
+      }
+      if (restConfig.containsKey(POOLING_DEFAULT_MAX_PER_RUOTE)) {
+        cm.setDefaultMaxPerRoute(restConfig.getPoolingDefaultMaxPerRoute());
+      }
+      return cm;
+    }
+
+    /**
+     * Only used for testing.
+     * @param httpClient
+     */
+    protected void setHttpClient(CloseableHttpClient httpClient) {
+      this.httpClient = httpClient;
+    }
+
+    /**
+     * Perform the HttpClient get and handle the results.  A configurable list 
of status codes are accepted and the
+     * response content (expected to be json) is parsed into a Map.  Values 
returned on errors and when response content
+     * is also configurable.  The rest config "timeout" setting is imposed in 
this method and will abort the get request
+     * if exceeded.
+     *
+     * @param restConfig
+     * @param httpGet
+     * @param httpClientContext
+     * @return
+     * @throws IOException
+     */
+    private Object doGet(RestConfig restConfig, HttpGet httpGet, 
HttpClientContext httpClientContext) throws IOException {
+
+      // Schedule a command to abort the httpGet request if the timeout is 
exceeded
+      ScheduledFuture scheduledFuture = 
scheduledExecutorService.schedule(httpGet::abort, restConfig.getTimeout(), 
TimeUnit.MILLISECONDS);
+      CloseableHttpResponse response;
+      try {
+        response = httpClient.execute(httpGet, httpClientContext);
+      } catch(IOException e) {
+        // Report a timeout if the httpGet request was aborted.  Otherwise 
rethrow exception.
+        if (httpGet.isAborted()) {
+          throw new IOException(String.format("Total Stellar REST request time 
to %s exceeded the configured timeout of %d ms.", httpGet.getURI().toString(), 
restConfig.getTimeout()));
+        } else {
+          throw e;
+        }
+      }
+
+      // Cancel the future if the request finished within the timeout
+      if (!scheduledFuture.isDone()) {
+        scheduledFuture.cancel(true);
+      }
+      int statusCode = response.getStatusLine().getStatusCode();
+      if (restConfig.getResponseCodesAllowed().contains(statusCode)) {
+        HttpEntity httpEntity = response.getEntity();
+
+        // Parse the reponse if present, return the empty value override if not
+        if (httpEntity != null && httpEntity.getContentLength() > 0) {
+          String json = EntityUtils.toString(response.getEntity());
+          return JSONUtils.INSTANCE.load(json, JSONUtils.MAP_SUPPLIER);
+        }
+        return restConfig.getEmptyContentOverride();
+      } else {
+        throw new IOException(String.format("Stellar REST request to %s 
expected status code to be one of %s but " +
+                "failed with http status code %d: %s",
+                httpGet.getURI().toString(),
+                restConfig.getResponseCodesAllowed().toString(),
+                statusCode,
+                EntityUtils.toString(response.getEntity())));
+      }
+    }
+
+    private Map<String, Object> getGlobalConfig(Context context) {
+      Optional<Object> globalCapability = context.getCapability(GLOBAL_CONFIG, 
false);
+      return globalCapability.map(o -> (Map<String, Object>) 
o).orElseGet(HashMap::new);
+    }
+
+    /**
+     * Build the RestConfig object using the following order of precedence:
+     * <ul>
+     *   <li>rest config supplied as an expression parameter</li>
+     *   <li>rest config stored in the global config</li>
+     *   <li>default rest config</li>
+     * </ul>
+     * Only settings specified in the rest config will override lower priority 
config settings.
+     * @param args
+     * @param globalConfig
+     * @return
+     * @throws IOException
+     */
+    protected RestConfig getRestConfig(List<Object> args, Map<String, Object> 
globalConfig) {
+      Map<String, Object> globalRestConfig = (Map<String, Object>) 
globalConfig.get(STELLAR_REST_SETTINGS);
+      Map<String, Object> functionRestConfig = null;
+      if (args.size() > 1) {
+        functionRestConfig = getArg(1, Map.class, args);
+      }
+
+      // Add settings in order of precedence
+      RestConfig restConfig = new RestConfig();
+      if (globalRestConfig != null) {
+        restConfig.putAll(globalRestConfig);
+      }
+      if (functionRestConfig != null) {
+        restConfig.putAll(functionRestConfig);
+      }
+      return restConfig;
+    }
+
+    /**
+     * Returns the proxy HttpHost object if the proxy rest config settings are 
set.
+     * @param restConfig
+     * @return
+     */
+    protected Optional<HttpHost> getProxy(RestConfig restConfig) {
+      Optional<HttpHost> proxy = Optional.empty();
+      if (restConfig.getProxyHost() != null && restConfig.getProxyPort() != 
null) {
+        proxy = Optional.of(new HttpHost(restConfig.getProxyHost(), 
restConfig.getProxyPort(), "http"));
+      }
+      return proxy;
+    }
+
+    /**
+     * Builds the RequestConfig object by setting HttpClient settings defined 
in the rest config.
+     * @param restConfig
+     * @param proxy
+     * @return
+     */
+    protected RequestConfig getRequestConfig(RestConfig restConfig, 
Optional<HttpHost> proxy) {
+      RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
+      if (restConfig.getConnectTimeout() != null) {
+        requestConfigBuilder.setConnectTimeout(restConfig.getConnectTimeout());
+      }
+      if (restConfig.getConnectionRequestTimeout() != null) {
+        
requestConfigBuilder.setConnectionRequestTimeout(restConfig.getConnectionRequestTimeout());
+      }
+      if (restConfig.getSocketTimeout() != null) {
+        requestConfigBuilder.setSocketTimeout(restConfig.getSocketTimeout());
+      }
+
+      proxy.ifPresent(requestConfigBuilder::setProxy);
+      return requestConfigBuilder.build();
+    }
+
+    /**
+     * Builds the HttpClientContext object by setting the basic auth and/or 
proxy basic auth credentials when the
+     * necessary rest config settings are configured.  Passwords are stored in 
HDFS.
+     * @param restConfig
+     * @param target
+     * @param proxy
+     * @return
+     * @throws IOException
+     */
+    protected HttpClientContext getHttpClientContext(RestConfig restConfig, 
HttpHost target, Optional<HttpHost> proxy) throws IOException {
+      HttpClientContext httpClientContext = HttpClientContext.create();
+      boolean credentialsAdded = false;
+      CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+
+      // Add the basic auth credentials if the rest config settings are present
+      if (restConfig.getBasicAuthUser() != null && 
restConfig.getBasicAuthPasswordPath() != null) {
+        String password = new String(readBytes(new 
Path(restConfig.getBasicAuthPasswordPath())), StandardCharsets.UTF_8);
+        credentialsProvider.setCredentials(
+                new AuthScope(target),
+                new UsernamePasswordCredentials(restConfig.getBasicAuthUser(), 
password));
+        credentialsAdded = true;
+      }
+
+      // Add the proxy basic auth credentials if the rest config settings are 
present
+      if (proxy.isPresent() && restConfig.getProxyBasicAuthUser() != null &&
+              restConfig.getProxyBasicAuthPasswordPath() != null) {
+        String password = new String(readBytes(new 
Path(restConfig.getProxyBasicAuthPasswordPath())), StandardCharsets.UTF_8);
+        credentialsProvider.setCredentials(
+                new AuthScope(proxy.get()),
+                new 
UsernamePasswordCredentials(restConfig.getProxyBasicAuthUser(), password));
+        credentialsAdded = true;
+      }
+      if (credentialsAdded) {
+        httpClientContext.setCredentialsProvider(credentialsProvider);
+      }
+      return httpClientContext;
+    }
+
+    /**
+     * Read bytes from a HDFS path.
+     * @param inPath
+     * @return
+     * @throws IOException
+     */
+    private byte[] readBytes(Path inPath) throws IOException {
+      FileSystem fs = FileSystem.get(inPath.toUri(), new Configuration());
+      try (FSDataInputStream inputStream = fs.open(inPath)) {
+        return IOUtils.toByteArray(inputStream);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3e73391e/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/dsl/functions/RestFunctionsTest.java
----------------------------------------------------------------------
diff --git 
a/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/dsl/functions/RestFunctionsTest.java
 
b/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/dsl/functions/RestFunctionsTest.java
new file mode 100644
index 0000000..ba80f02
--- /dev/null
+++ 
b/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/dsl/functions/RestFunctionsTest.java
@@ -0,0 +1,604 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.stellar.dsl.functions;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.commons.io.FileUtils;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.ParseException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockserver.client.server.MockServerClient;
+import org.mockserver.junit.MockServerRule;
+import org.mockserver.junit.ProxyRule;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.metron.stellar.common.utils.StellarProcessorUtils.run;
+import static 
org.apache.metron.stellar.dsl.functions.RestConfig.BASIC_AUTH_PASSWORD_PATH;
+import static 
org.apache.metron.stellar.dsl.functions.RestConfig.BASIC_AUTH_USER;
+import static 
org.apache.metron.stellar.dsl.functions.RestConfig.CONNECTION_REQUEST_TIMEOUT;
+import static 
org.apache.metron.stellar.dsl.functions.RestConfig.CONNECT_TIMEOUT;
+import static 
org.apache.metron.stellar.dsl.functions.RestConfig.POOLING_DEFAULT_MAX_PER_RUOTE;
+import static 
org.apache.metron.stellar.dsl.functions.RestConfig.POOLING_MAX_TOTAL;
+import static 
org.apache.metron.stellar.dsl.functions.RestConfig.PROXY_BASIC_AUTH_PASSWORD_PATH;
+import static 
org.apache.metron.stellar.dsl.functions.RestConfig.PROXY_BASIC_AUTH_USER;
+import static org.apache.metron.stellar.dsl.functions.RestConfig.PROXY_HOST;
+import static org.apache.metron.stellar.dsl.functions.RestConfig.PROXY_PORT;
+import static 
org.apache.metron.stellar.dsl.functions.RestConfig.SOCKET_TIMEOUT;
+import static 
org.apache.metron.stellar.dsl.functions.RestConfig.STELLAR_REST_SETTINGS;
+import static org.apache.metron.stellar.dsl.functions.RestConfig.TIMEOUT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+
+/**
+ * Tests the RestFunctions class.
+ */
+public class RestFunctionsTest {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Rule
+  public MockServerRule mockServerRule = new MockServerRule(this);
+
+  @Rule
+  public ProxyRule proxyRule = new ProxyRule(1080, this);
+
+  private MockServerClient mockServerClient;
+  private String getUri;
+  private Context context;
+
+  private String basicAuthPasswordPath = "./target/basicAuth.txt";
+  private String basicAuthPassword = "password";
+  private String proxyBasicAuthPasswordPath = "./target/proxyBasicAuth.txt";
+  private String proxyAuthPassword = "proxyPassword";
+
+  @Before
+  public void setup() throws Exception {
+    context = new Context.Builder()
+            .with(Context.Capabilities.GLOBAL_CONFIG, HashMap::new)
+            .build();
+
+    // Store the passwords in the local file system
+    FileUtils.writeStringToFile(new File(basicAuthPasswordPath), 
basicAuthPassword, StandardCharsets.UTF_8);
+    FileUtils.writeStringToFile(new File(proxyBasicAuthPasswordPath), 
proxyAuthPassword, StandardCharsets.UTF_8);
+
+    // By default, the mock server expects a GET request with the path set to 
/get
+    getUri = String.format("http://localhost:%d/get";, 
mockServerRule.getPort());
+    mockServerClient.when(
+            request()
+                    .withMethod("GET")
+                    .withPath("/get"))
+            .respond(response()
+                    .withBody("{\"get\":\"success\"}"));
+  }
+
+  /**
+   * The REST_GET function should perform a get request and parse the results.
+   */
+  @Test
+  public void restGetShouldSucceed() throws Exception {
+    Map<String, Object> actual = (Map<String, Object>) 
run(String.format("REST_GET('%s')", getUri), context);
+
+    assertEquals(1, actual.size());
+    assertEquals("success", actual.get("get"));
+  }
+
+  /**
+   * The REST_GET function should perform a get request using a proxy and 
parse the results.
+   */
+  @Test
+  public void restGetShouldSucceedWithProxy() {
+    mockServerClient.when(
+            request()
+                    .withMethod("GET")
+                    .withPath("/get"))
+            .respond(response()
+                    .withBody("{\"proxyGet\":\"success\"}"));
+
+    context.addCapability(Context.Capabilities.GLOBAL_CONFIG, () -> new 
HashMap<String, Object>() {{
+      put(PROXY_HOST, "localhost");
+      put(PROXY_PORT, proxyRule.getHttpPort());
+    }});
+
+    Map<String, Object> actual = (Map<String, Object>) 
run(String.format("REST_GET('%s')", getUri), context);
+
+    assertEquals(1, actual.size());
+    assertEquals("success", actual.get("proxyGet"));
+  }
+
+  /**
+   * The REST_GET function should handle an error status code and return null 
by default.
+   */
+  @Test
+  public void restGetShouldHandleErrorStatusCode() {
+      mockServerClient.when(
+              request()
+                      .withMethod("GET")
+                      .withPath("/get"))
+              .respond(response()
+                      .withStatusCode(403));
+
+      assertNull(run(String.format("REST_GET('%s')", getUri), context));
+  }
+
+  /**
+   * {
+   *   "response.codes.allowed": [200,404],
+   *   "empty.content.override": {}
+   * }
+   */
+  @Multiline
+  private String emptyContentOverride;
+
+  /**
+   * The REST_GET function should return the empty content override setting 
when status is allowed and content is empty.
+   */
+  @Test
+  public void restGetShouldReturnEmptyContentOverride() {
+      mockServerClient.when(
+              request()
+                      .withMethod("GET")
+                      .withPath("/get"))
+              .respond(response()
+                      .withStatusCode(404));
+
+    assertEquals(new HashMap<>(), run(String.format("REST_GET('%s', %s)", 
getUri, emptyContentOverride), context));
+  }
+
+  /**
+   * {
+   *   "error.value.override": "error message"
+   * }
+   */
+  @Multiline
+  private String errorValueOverride;
+
+  /**
+   * The REST_GET function should return the error value override setting on 
error.
+   */
+  @Test
+  public void restGetShouldReturnErrorValueOverride() {
+    mockServerClient.when(
+            request()
+                    .withMethod("GET")
+                    .withPath("/get"))
+            .respond(response()
+                    .withStatusCode(500));
+
+    Object result = run(String.format("REST_GET('%s', %s)", getUri, 
errorValueOverride), context);
+    assertEquals("error message" , result);
+  }
+
+  /**
+   * The REST_GET function should return a proxy HttpHost if the correct 
settings are present.
+   */
+  @Test
+  public void restGetShouldGetProxy() {
+    RestFunctions.RestGet restGet = new RestFunctions.RestGet();
+
+    {
+      RestConfig restConfig = new RestConfig();
+      Optional<HttpHost> actual = restGet.getProxy(restConfig);
+
+      assertEquals(Optional.empty(), actual);
+    }
+
+    {
+      RestConfig restConfig = new RestConfig();
+      restConfig.put(PROXY_HOST, "localhost");
+      Optional<HttpHost> actual = restGet.getProxy(restConfig);
+
+      assertEquals(Optional.empty(), actual);
+    }
+
+    {
+      RestConfig restConfig = new RestConfig();
+      restConfig.put(PROXY_PORT, proxyRule.getHttpPort());
+      Optional<HttpHost> actual = restGet.getProxy(restConfig);
+
+      assertEquals(Optional.empty(), actual);
+    }
+
+    {
+      RestConfig restConfig = new RestConfig();
+      restConfig.put(PROXY_HOST, "localhost");
+      restConfig.put(PROXY_PORT, proxyRule.getHttpPort());
+      Optional<HttpHost> actual = restGet.getProxy(restConfig);
+
+      assertEquals(new HttpHost("localhost", proxyRule.getHttpPort()), 
actual.get());
+    }
+  }
+
+  /**
+   * The REST_GET function should return settings in the correct order of 
precedence.
+   * @throws Exception
+   */
+  @Test
+  public void restGetShouldGetRestConfig() throws Exception {
+    RestFunctions.RestGet restGet = new RestFunctions.RestGet();
+
+    {
+      // Test for default timeout
+      RestConfig restConfig = 
restGet.getRestConfig(Collections.singletonList("uri"), new HashMap<>());
+
+      assertEquals(2, restConfig.size());
+      assertEquals(1000, restConfig.getTimeout().intValue());
+      assertEquals(Collections.singletonList(200), 
restConfig.getResponseCodesAllowed());
+      assertNull(restConfig.getBasicAuthUser());
+    }
+
+    Map<String, Object> globalRestConfig = new HashMap<String, Object>() {{
+      put(STELLAR_REST_SETTINGS, new HashMap<String, Object>() {{
+        put(SOCKET_TIMEOUT, 2000);
+        put(BASIC_AUTH_USER, "globalUser");
+        put(PROXY_HOST, "globalHost");
+      }});
+    }};
+
+    // Global config settings should take effect
+    {
+      RestConfig restConfig = 
restGet.getRestConfig(Collections.singletonList("uri"), globalRestConfig);
+
+      assertEquals(5, restConfig.size());
+      assertEquals(1000, restConfig.getTimeout().intValue());
+      assertEquals(Collections.singletonList(200), 
restConfig.getResponseCodesAllowed());
+      assertEquals(2000, restConfig.getSocketTimeout().intValue());
+      assertEquals("globalUser", restConfig.getBasicAuthUser());
+      assertEquals("globalHost", restConfig.getProxyHost());
+    }
+
+    Map<String, Object> functionRestConfig = new HashMap<String, Object>() {{
+      put(SOCKET_TIMEOUT, 1);
+      put(BASIC_AUTH_USER, "functionUser");
+      put(TIMEOUT, 100);
+    }};
+
+
+    // Function call settings should override global settings
+    {
+      RestConfig restConfig = restGet.getRestConfig(Arrays.asList("uri", 
functionRestConfig), globalRestConfig);
+
+      assertEquals(5, restConfig.size());
+      assertEquals(Collections.singletonList(200), 
restConfig.getResponseCodesAllowed());
+      assertEquals(100, restConfig.getTimeout().intValue());
+      assertEquals(1, restConfig.getSocketTimeout().intValue());
+      assertEquals("functionUser", restConfig.getBasicAuthUser());
+      assertEquals("globalHost", restConfig.getProxyHost());
+    }
+
+    functionRestConfig = new HashMap<String, Object>() {{
+      put(BASIC_AUTH_USER, "functionUser");
+      put(TIMEOUT, 100);
+    }};
+
+    // New function call settings should take effect with global settings 
staying the same
+    {
+      RestConfig restConfig = restGet.getRestConfig(Arrays.asList("uri", 
functionRestConfig), globalRestConfig);
+
+      assertEquals(5, restConfig.size());
+      assertEquals(Collections.singletonList(200), 
restConfig.getResponseCodesAllowed());
+      assertEquals(100, restConfig.getTimeout().intValue());
+      assertEquals(2000, restConfig.getSocketTimeout().intValue());
+      assertEquals("functionUser", restConfig.getBasicAuthUser());
+      assertEquals("globalHost", restConfig.getProxyHost());
+    }
+
+    globalRestConfig = new HashMap<String, Object>() {{
+      put(STELLAR_REST_SETTINGS, new HashMap<String, Object>() {{
+        put(SOCKET_TIMEOUT, 2000);
+        put(BASIC_AUTH_USER, "globalUser");
+      }});
+    }};
+
+    // New global settings should take effect with function call settings 
staying the same
+    {
+      RestConfig restConfig = restGet.getRestConfig(Arrays.asList("uri", 
functionRestConfig), globalRestConfig);
+
+      assertEquals(4, restConfig.size());
+      assertEquals(Collections.singletonList(200), 
restConfig.getResponseCodesAllowed());
+      assertEquals(100, restConfig.getTimeout().intValue());
+      assertEquals(2000, restConfig.getSocketTimeout().intValue());
+      assertEquals("functionUser", restConfig.getBasicAuthUser());
+    }
+
+    // Should fall back to global settings on missing function call config
+    {
+      RestConfig restConfig = 
restGet.getRestConfig(Collections.singletonList("uri"), globalRestConfig);
+
+      assertEquals(4, restConfig.size());
+      assertEquals(Collections.singletonList(200), 
restConfig.getResponseCodesAllowed());
+      assertEquals(1000, restConfig.getTimeout().intValue());
+      assertEquals(2000, restConfig.getSocketTimeout().intValue());
+      assertEquals("globalUser", restConfig.getBasicAuthUser());
+    }
+
+    // Should fall back to default settings on missing global settings
+    {
+      RestConfig restConfig = 
restGet.getRestConfig(Collections.singletonList("uri"), new HashMap<>());
+
+      assertEquals(2, restConfig.size());
+      assertEquals(Collections.singletonList(200), 
restConfig.getResponseCodesAllowed());
+      assertEquals(1000, restConfig.getTimeout().intValue());
+    }
+
+  }
+
+  /**
+   * The REST_GET function should properly set the HttpClient timeout settings 
and proxy
+   */
+  @Test
+  public void restGetShouldGetRequestConfig() {
+    RestFunctions.RestGet restGet = new RestFunctions.RestGet();
+
+    {
+      RequestConfig actual = restGet.getRequestConfig(new RestConfig(), 
Optional.empty());
+      RequestConfig expected = RequestConfig.custom().build();
+
+      assertEquals(expected.getConnectTimeout(), actual.getConnectTimeout());
+      assertEquals(expected.getConnectionRequestTimeout(), 
actual.getConnectionRequestTimeout());
+      assertEquals(expected.getSocketTimeout(), actual.getSocketTimeout());
+      assertEquals(expected.getProxy(), actual.getProxy());
+    }
+
+    {
+      RestConfig restConfig = new RestConfig();
+      restConfig.put(CONNECT_TIMEOUT, 1);
+      restConfig.put(CONNECTION_REQUEST_TIMEOUT, 2);
+      restConfig.put(SOCKET_TIMEOUT, 3);
+      HttpHost proxy = new HttpHost("localhost", proxyRule.getHttpPort());
+      Optional<HttpHost> proxyOptional = Optional.of(proxy);
+
+      RequestConfig actual = restGet.getRequestConfig(restConfig, 
proxyOptional);
+      RequestConfig expected = RequestConfig.custom()
+              .setConnectTimeout(1)
+              .setConnectionRequestTimeout(2)
+              .setSocketTimeout(3)
+              .setProxy(proxy)
+              .build();
+
+      assertEquals(expected.getConnectTimeout(), actual.getConnectTimeout());
+      assertEquals(expected.getConnectionRequestTimeout(), 
actual.getConnectionRequestTimeout());
+      assertEquals(expected.getSocketTimeout(), actual.getSocketTimeout());
+      assertEquals(expected.getProxy(), actual.getProxy());
+    }
+
+  }
+
+  /**
+   * The REST_GET function should set the proper credentials in the 
HttpClientContext.
+   * @throws Exception
+   */
+  @Test
+  public void restGetShouldGetHttpClientContext() throws Exception {
+    RestFunctions.RestGet restGet = new RestFunctions.RestGet();
+    HttpHost target = new HttpHost("localhost", mockServerRule.getPort());
+    HttpHost proxy = new HttpHost("localhost", proxyRule.getHttpPort());
+
+    {
+      RestConfig restConfig = new RestConfig();
+      HttpClientContext actual = restGet.getHttpClientContext(restConfig, 
target, Optional.empty());
+
+      assertNull(actual.getCredentialsProvider());
+    }
+
+    {
+      RestConfig restConfig = new RestConfig();
+      restConfig.put(BASIC_AUTH_USER, "user");
+      restConfig.put(BASIC_AUTH_PASSWORD_PATH, basicAuthPasswordPath);
+
+      HttpClientContext actual = restGet.getHttpClientContext(restConfig, 
target, Optional.empty());
+      HttpClientContext expected = HttpClientContext.create();
+      CredentialsProvider expectedCredentialsProvider = new 
BasicCredentialsProvider();
+      expectedCredentialsProvider.setCredentials(
+              new AuthScope(target),
+              new UsernamePasswordCredentials(restConfig.getBasicAuthUser(), 
basicAuthPassword));
+      expected.setCredentialsProvider(expectedCredentialsProvider);
+
+      assertEquals(expected.getCredentialsProvider().getCredentials(new 
AuthScope(target)),
+              actual.getCredentialsProvider().getCredentials(new 
AuthScope(target)));
+      assertEquals(expected.getCredentialsProvider().getCredentials(new 
AuthScope(proxy)),
+              actual.getCredentialsProvider().getCredentials(new 
AuthScope(proxy)));
+    }
+
+    {
+      RestConfig restConfig = new RestConfig();
+      restConfig.put(PROXY_BASIC_AUTH_USER, "proxyUser");
+      restConfig.put(PROXY_BASIC_AUTH_PASSWORD_PATH, 
proxyBasicAuthPasswordPath);
+
+      HttpClientContext actual = restGet.getHttpClientContext(restConfig, 
target, Optional.of(proxy));
+      HttpClientContext expected = HttpClientContext.create();
+      CredentialsProvider expectedCredentialsProvider = new 
BasicCredentialsProvider();
+      expectedCredentialsProvider.setCredentials(
+              new AuthScope(proxy),
+              new 
UsernamePasswordCredentials(restConfig.getProxyBasicAuthUser(), 
proxyAuthPassword));
+      expected.setCredentialsProvider(expectedCredentialsProvider);
+
+      assertEquals(expected.getCredentialsProvider().getCredentials(new 
AuthScope(target)),
+              actual.getCredentialsProvider().getCredentials(new 
AuthScope(target)));
+      assertEquals(expected.getCredentialsProvider().getCredentials(new 
AuthScope(proxy)),
+              actual.getCredentialsProvider().getCredentials(new 
AuthScope(proxy)));
+    }
+
+    {
+      RestConfig restConfig = new RestConfig();
+      restConfig.put(BASIC_AUTH_USER, "user");
+      restConfig.put(BASIC_AUTH_PASSWORD_PATH, basicAuthPasswordPath);
+      restConfig.put(PROXY_BASIC_AUTH_USER, "proxyUser");
+      restConfig.put(PROXY_BASIC_AUTH_PASSWORD_PATH, 
proxyBasicAuthPasswordPath);
+
+      HttpClientContext actual = restGet.getHttpClientContext(restConfig, 
target, Optional.of(proxy));
+      HttpClientContext expected = HttpClientContext.create();
+      CredentialsProvider expectedCredentialsProvider = new 
BasicCredentialsProvider();
+      expectedCredentialsProvider.setCredentials(
+              new AuthScope(target),
+              new UsernamePasswordCredentials(restConfig.getBasicAuthUser(), 
basicAuthPassword));
+      expectedCredentialsProvider.setCredentials(
+              new AuthScope(proxy),
+              new 
UsernamePasswordCredentials(restConfig.getProxyBasicAuthUser(), 
proxyAuthPassword));
+      expected.setCredentialsProvider(expectedCredentialsProvider);
+
+      assertEquals(expected.getCredentialsProvider().getCredentials(new 
AuthScope(target)),
+              actual.getCredentialsProvider().getCredentials(new 
AuthScope(target)));
+      assertEquals(expected.getCredentialsProvider().getCredentials(new 
AuthScope(proxy)),
+              actual.getCredentialsProvider().getCredentials(new 
AuthScope(proxy)));
+    }
+  }
+
+  /**
+   * The REST_GET function should timeout and return null.
+   */
+  @Test
+  public void restGetShouldTimeout() {
+    String uri = String.format("http://localhost:%d/get";, 
mockServerRule.getPort());
+
+    mockServerClient.when(
+            request()
+                    .withMethod("GET")
+                    .withPath("/get"))
+            .respond(response()
+                    .withBody("{\"get\":\"success\"}"));
+
+    Map<String, Object> globalConfig = new HashMap<String, Object>() {{
+      put(STELLAR_REST_SETTINGS, new HashMap<String, Object>() {{
+        put(TIMEOUT, 1);
+      }});
+    }};
+
+    context.addCapability(Context.Capabilities.GLOBAL_CONFIG, () -> 
globalConfig);
+
+    Map<String, Object> actual = (Map<String, Object>) 
run(String.format("REST_GET('%s')", uri), context);
+    assertNull(actual);
+  }
+
+  /**
+   * {
+   * "timeout": 1
+   * }
+   */
+  @Multiline
+  private String timeoutConfig;
+
+  /**
+   * The REST_GET function should honor the function supplied timeout setting.
+   */
+  @Test
+  public void restGetShouldTimeoutWithSuppliedTimeout() {
+    String expression = String.format("REST_GET('%s', %s)", getUri, 
timeoutConfig);
+    Map<String, Object> actual = (Map<String, Object>) run(expression, 
context);
+    assertNull(actual);
+  }
+
+  /**
+   * The REST_GET function should throw an exception on a malformed uri.
+   * @throws IllegalArgumentException
+   * @throws IOException
+   */
+  @Test
+  public void restGetShouldHandleURISyntaxException() throws 
IllegalArgumentException, IOException {
+    thrown.expect(ParseException.class);
+    thrown.expectMessage("Unable to parse REST_GET('some invalid uri'): Unable 
to parse: REST_GET('some invalid uri') due to: Illegal character in path at 
index 4: some invalid uri");
+
+    run("REST_GET('some invalid uri')", context);
+  }
+
+  /**
+   * The REST_GET function should handle IOExceptions and return null.
+   * @throws IllegalArgumentException
+   * @throws IOException
+   */
+  @Test
+  public void restGetShouldHandleIOException() throws 
IllegalArgumentException, IOException {
+    Map<String, Object> globalConfig = new HashMap<String, Object>() {{
+      put(STELLAR_REST_SETTINGS, new HashMap<String, Object>() {{
+        put(SOCKET_TIMEOUT, 1);
+      }});
+    }};
+
+    context.addCapability(Context.Capabilities.GLOBAL_CONFIG, () -> 
globalConfig);
+
+    Object result = run(String.format("REST_GET('%s')", getUri), context);
+    Assert.assertNull(result);
+  }
+
+  /**
+   * The REST_GET function should throw an exception when the required uri 
parameter is missing.
+   */
+  @Test
+  public void restGetShouldThrownExceptionOnMissingParameter() {
+    thrown.expect(ParseException.class);
+    thrown.expectMessage("Unable to parse REST_GET(): Unable to parse: 
REST_GET() due to: Expected at least 1 argument(s), found 0");
+
+    run("REST_GET()", context);
+  }
+
+  @Test
+  public void restGetShouldGetPoolingConnectionManager() {
+    RestFunctions.RestGet restGet = new RestFunctions.RestGet();
+
+    RestConfig restConfig = new RestConfig();
+    restConfig.put(POOLING_MAX_TOTAL, 5);
+    restConfig.put(POOLING_DEFAULT_MAX_PER_RUOTE, 2);
+
+    PoolingHttpClientConnectionManager cm = 
restGet.getConnectionManager(restConfig);
+
+    assertEquals(5, cm.getMaxTotal());
+    assertEquals(2, cm.getDefaultMaxPerRoute());
+  }
+
+  @Test
+  public void restGetShouldCloseHttpClient() throws Exception {
+    RestFunctions.RestGet restGet = new RestFunctions.RestGet();
+    CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
+
+    restGet.setHttpClient(httpClient);
+    restGet.close();
+
+    verify(httpClient, times(1)).close();
+    verifyNoMoreInteractions(httpClient);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3e73391e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f412036..9a8bd6a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -139,6 +139,7 @@
         <global_log4j_core_version>2.1</global_log4j_core_version>
         <global_simple_syslog_version>0.0.9</global_simple_syslog_version>
         <global_spark_version>2.3.1</global_spark_version>
+        <global_httpclient_version>4.3.2</global_httpclient_version>
     </properties>
 
     <profiles>

Reply via email to