http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/PausableInput.java
----------------------------------------------------------------------
diff --git 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/PausableInput.java
 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/PausableInput.java
new file mode 100644
index 0000000..2acd058
--- /dev/null
+++ 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/PausableInput.java
@@ -0,0 +1,372 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.stellar.common.shell;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * An input stream which mirrors System.in, but allows you to 'pause' and 
'unpause' it.
+ * The Aeshell has an external thread which is constantly polling System.in.  
If you
+ * need to spawn a program externally (i.e. an editor) which shares stdin, 
this thread
+ * and the spawned program both share a buffer.  This causes contention and 
unpredictable
+ * results (e.g. an input may be consumed by either the aeshell thread or the 
spawned program)
+ *
+ * Because you can inject an input stream into the console, we create this 
which can act as a
+ * facade to System.in under normal 'unpaused' circumstances, and when paused, 
turn off the
+ * access to System.in.  This allows us to turn off access to aeshell while 
maintaining access
+ * to the external program.
+ *
+ */
+public class PausableInput extends InputStream {
+  InputStream in = System.in;
+  boolean paused = false;
+  private PausableInput() {
+    super();
+  }
+
+  /**
+   * Stop mirroring stdin
+   */
+  public void pause() {
+    paused = true;
+  }
+
+  /**
+   * Resume mirroring stdin.
+   * @throws IOException
+   */
+  public void unpause() throws IOException {
+    in.read(new byte[in.available()]);
+    paused = false;
+  }
+
+  public final static PausableInput INSTANCE = new PausableInput();
+
+  /**
+   * Reads the next byte of data from the input stream. The value byte is
+   * returned as an <code>int</code> in the range <code>0</code> to
+   * <code>255</code>. If no byte is available because the end of the stream
+   * has been reached, the value <code>-1</code> is returned. This method
+   * blocks until input data is available, the end of the stream is detected,
+   * or an exception is thrown.
+   * <p>
+   * <p> A subclass must provide an implementation of this method.
+   *
+   * @return the next byte of data, or <code>-1</code> if the end of the
+   * stream is reached.
+   * @throws IOException if an I/O error occurs.
+   */
+  @Override
+  public int read() throws IOException {
+
+    return in.read();
+  }
+
+  /**
+   * Reads some number of bytes from the input stream and stores them into
+   * the buffer array <code>b</code>. The number of bytes actually read is
+   * returned as an integer.  This method blocks until input data is
+   * available, end of file is detected, or an exception is thrown.
+   * <p>
+   * <p> If the length of <code>b</code> is zero, then no bytes are read and
+   * <code>0</code> is returned; otherwise, there is an attempt to read at
+   * least one byte. If no byte is available because the stream is at the
+   * end of the file, the value <code>-1</code> is returned; otherwise, at
+   * least one byte is read and stored into <code>b</code>.
+   * <p>
+   * <p> The first byte read is stored into element <code>b[0]</code>, the
+   * next one into <code>b[1]</code>, and so on. The number of bytes read is,
+   * at most, equal to the length of <code>b</code>. Let <i>k</i> be the
+   * number of bytes actually read; these bytes will be stored in elements
+   * <code>b[0]</code> through <code>b[</code><i>k</i><code>-1]</code>,
+   * leaving elements <code>b[</code><i>k</i><code>]</code> through
+   * <code>b[b.length-1]</code> unaffected.
+   * <p>
+   * <p> The <code>read(b)</code> method for class <code>InputStream</code>
+   * has the same effect as: <pre><code> read(b, 0, b.length) </code></pre>
+   *
+   * @param b the buffer into which the data is read.
+   * @return the total number of bytes read into the buffer, or
+   * <code>-1</code> if there is no more data because the end of
+   * the stream has been reached.
+   * @throws IOException          If the first byte cannot be read for any 
reason
+   *                              other than the end of the file, if the input 
stream has been closed, or
+   *                              if some other I/O error occurs.
+   * @throws NullPointerException if <code>b</code> is <code>null</code>.
+   * @see InputStream#read(byte[], int, int)
+   */
+  @Override
+  public int read(byte[] b) throws IOException {
+
+    if(paused) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      return 0;
+    }
+    int ret = in.read(b);
+    return ret;
+  }
+
+  /**
+   * Reads up to <code>len</code> bytes of data from the input stream into
+   * an array of bytes.  An attempt is made to read as many as
+   * <code>len</code> bytes, but a smaller number may be read.
+   * The number of bytes actually read is returned as an integer.
+   * <p>
+   * <p> This method blocks until input data is available, end of file is
+   * detected, or an exception is thrown.
+   * <p>
+   * <p> If <code>len</code> is zero, then no bytes are read and
+   * <code>0</code> is returned; otherwise, there is an attempt to read at
+   * least one byte. If no byte is available because the stream is at end of
+   * file, the value <code>-1</code> is returned; otherwise, at least one
+   * byte is read and stored into <code>b</code>.
+   * <p>
+   * <p> The first byte read is stored into element <code>b[off]</code>, the
+   * next one into <code>b[off+1]</code>, and so on. The number of bytes read
+   * is, at most, equal to <code>len</code>. Let <i>k</i> be the number of
+   * bytes actually read; these bytes will be stored in elements
+   * <code>b[off]</code> through <code>b[off+</code><i>k</i><code>-1]</code>,
+   * leaving elements <code>b[off+</code><i>k</i><code>]</code> through
+   * <code>b[off+len-1]</code> unaffected.
+   * <p>
+   * <p> In every case, elements <code>b[0]</code> through
+   * <code>b[off]</code> and elements <code>b[off+len]</code> through
+   * <code>b[b.length-1]</code> are unaffected.
+   * <p>
+   * <p> The <code>read(b,</code> <code>off,</code> <code>len)</code> method
+   * for class <code>InputStream</code> simply calls the method
+   * <code>read()</code> repeatedly. If the first such call results in an
+   * <code>IOException</code>, that exception is returned from the call to
+   * the <code>read(b,</code> <code>off,</code> <code>len)</code> method.  If
+   * any subsequent call to <code>read()</code> results in a
+   * <code>IOException</code>, the exception is caught and treated as if it
+   * were end of file; the bytes read up to that point are stored into
+   * <code>b</code> and the number of bytes read before the exception
+   * occurred is returned. The default implementation of this method blocks
+   * until the requested amount of input data <code>len</code> has been read,
+   * end of file is detected, or an exception is thrown. Subclasses are 
encouraged
+   * to provide a more efficient implementation of this method.
+   *
+   * @param b   the buffer into which the data is read.
+   * @param off the start offset in array <code>b</code>
+   *            at which the data is written.
+   * @param len the maximum number of bytes to read.
+   * @return the total number of bytes read into the buffer, or
+   * <code>-1</code> if there is no more data because the end of
+   * the stream has been reached.
+   * @throws IOException               If the first byte cannot be read for 
any reason
+   *                                   other than end of file, or if the input 
stream has been closed, or if
+   *                                   some other I/O error occurs.
+   * @throws NullPointerException      If <code>b</code> is <code>null</code>.
+   * @throws IndexOutOfBoundsException If <code>off</code> is negative,
+   *                                   <code>len</code> is negative, or 
<code>len</code> is greater than
+   *                                   <code>b.length - off</code>
+   * @see InputStream#read()
+   */
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    if(paused) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      return 0;
+    }
+    int ret = in.read(b, off, len);
+    return ret;
+  }
+
+  /**
+   * Skips over and discards <code>n</code> bytes of data from this input
+   * stream. The <code>skip</code> method may, for a variety of reasons, end
+   * up skipping over some smaller number of bytes, possibly <code>0</code>.
+   * This may result from any of a number of conditions; reaching end of file
+   * before <code>n</code> bytes have been skipped is only one possibility.
+   * The actual number of bytes skipped is returned. If {@code n} is
+   * negative, the {@code skip} method for class {@code InputStream} always
+   * returns 0, and no bytes are skipped. Subclasses may handle the negative
+   * value differently.
+   * <p>
+   * <p> The <code>skip</code> method of this class creates a
+   * byte array and then repeatedly reads into it until <code>n</code> bytes
+   * have been read or the end of the stream has been reached. Subclasses are
+   * encouraged to provide a more efficient implementation of this method.
+   * For instance, the implementation may depend on the ability to seek.
+   *
+   * @param n the number of bytes to be skipped.
+   * @return the actual number of bytes skipped.
+   * @throws IOException if the stream does not support seek,
+   *                     or if some other I/O error occurs.
+   */
+  @Override
+  public long skip(long n) throws IOException {
+
+    return in.skip(n);
+  }
+
+  /**
+   * Returns an estimate of the number of bytes that can be read (or
+   * skipped over) from this input stream without blocking by the next
+   * invocation of a method for this input stream. The next invocation
+   * might be the same thread or another thread.  A single read or skip of this
+   * many bytes will not block, but may read or skip fewer bytes.
+   * <p>
+   * <p> Note that while some implementations of {@code InputStream} will 
return
+   * the total number of bytes in the stream, many will not.  It is
+   * never correct to use the return value of this method to allocate
+   * a buffer intended to hold all data in this stream.
+   * <p>
+   * <p> A subclass' implementation of this method may choose to throw an
+   * {@link IOException} if this input stream has been closed by
+   * invoking the {@link #close()} method.
+   * <p>
+   * <p> The {@code available} method for class {@code InputStream} always
+   * returns {@code 0}.
+   * <p>
+   * <p> This method should be overridden by subclasses.
+   *
+   * @return an estimate of the number of bytes that can be read (or skipped
+   * over) from this input stream without blocking or {@code 0} when
+   * it reaches the end of the input stream.
+   * @throws IOException if an I/O error occurs.
+   */
+  @Override
+  public int available() throws IOException {
+
+    return in.available();
+  }
+
+  /**
+   * Closes this input stream and releases any system resources associated
+   * with the stream.
+   * <p>
+   * <p> The <code>close</code> method of <code>InputStream</code> does
+   * nothing.
+   *
+   * @throws IOException if an I/O error occurs.
+   */
+  @Override
+  public void close() throws IOException {
+    in.close();
+  }
+
+  /**
+   * Marks the current position in this input stream. A subsequent call to
+   * the <code>reset</code> method repositions this stream at the last marked
+   * position so that subsequent reads re-read the same bytes.
+   * <p>
+   * <p> The <code>readlimit</code> arguments tells this input stream to
+   * allow that many bytes to be read before the mark position gets
+   * invalidated.
+   * <p>
+   * <p> The general contract of <code>mark</code> is that, if the method
+   * <code>markSupported</code> returns <code>true</code>, the stream somehow
+   * remembers all the bytes read after the call to <code>mark</code> and
+   * stands ready to supply those same bytes again if and whenever the method
+   * <code>reset</code> is called.  However, the stream is not required to
+   * remember any data at all if more than <code>readlimit</code> bytes are
+   * read from the stream before <code>reset</code> is called.
+   * <p>
+   * <p> Marking a closed stream should not have any effect on the stream.
+   * <p>
+   * <p> The <code>mark</code> method of <code>InputStream</code> does
+   * nothing.
+   *
+   * @param readlimit the maximum limit of bytes that can be read before
+   *                  the mark position becomes invalid.
+   * @see InputStream#reset()
+   */
+  @Override
+  public synchronized void mark(int readlimit) {
+    in.mark(readlimit);
+  }
+
+  /**
+   * Repositions this stream to the position at the time the
+   * <code>mark</code> method was last called on this input stream.
+   * <p>
+   * <p> The general contract of <code>reset</code> is:
+   * <p>
+   * <ul>
+   * <li> If the method <code>markSupported</code> returns
+   * <code>true</code>, then:
+   * <p>
+   * <ul><li> If the method <code>mark</code> has not been called since
+   * the stream was created, or the number of bytes read from the stream
+   * since <code>mark</code> was last called is larger than the argument
+   * to <code>mark</code> at that last call, then an
+   * <code>IOException</code> might be thrown.
+   * <p>
+   * <li> If such an <code>IOException</code> is not thrown, then the
+   * stream is reset to a state such that all the bytes read since the
+   * most recent call to <code>mark</code> (or since the start of the
+   * file, if <code>mark</code> has not been called) will be resupplied
+   * to subsequent callers of the <code>read</code> method, followed by
+   * any bytes that otherwise would have been the next input data as of
+   * the time of the call to <code>reset</code>. </ul>
+   * <p>
+   * <li> If the method <code>markSupported</code> returns
+   * <code>false</code>, then:
+   * <p>
+   * <ul><li> The call to <code>reset</code> may throw an
+   * <code>IOException</code>.
+   * <p>
+   * <li> If an <code>IOException</code> is not thrown, then the stream
+   * is reset to a fixed state that depends on the particular type of the
+   * input stream and how it was created. The bytes that will be supplied
+   * to subsequent callers of the <code>read</code> method depend on the
+   * particular type of the input stream. </ul></ul>
+   * <p>
+   * <p>The method <code>reset</code> for class <code>InputStream</code>
+   * does nothing except throw an <code>IOException</code>.
+   *
+   * @throws IOException if this stream has not been marked or if the
+   *                     mark has been invalidated.
+   * @see InputStream#mark(int)
+   * @see IOException
+   */
+  @Override
+  public synchronized void reset() throws IOException {
+    in.reset();
+  }
+
+  /**
+   * Tests if this input stream supports the <code>mark</code> and
+   * <code>reset</code> methods. Whether or not <code>mark</code> and
+   * <code>reset</code> are supported is an invariant property of a
+   * particular input stream instance. The <code>markSupported</code> method
+   * of <code>InputStream</code> returns <code>false</code>.
+   *
+   * @return <code>true</code> if this stream instance supports the mark
+   * and reset methods; <code>false</code> otherwise.
+   * @see InputStream#mark(int)
+   * @see InputStream#reset()
+   */
+  @Override
+  public boolean markSupported() {
+    return in.markSupported();
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/StellarExecutor.java
----------------------------------------------------------------------
diff --git 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/StellarExecutor.java
 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/StellarExecutor.java
new file mode 100644
index 0000000..1181c6f
--- /dev/null
+++ 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/StellarExecutor.java
@@ -0,0 +1,322 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.stellar.common.shell;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import org.apache.commons.collections4.trie.PatriciaTrie;
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.stellar.common.configuration.ConfigurationsUtils;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.MapVariableResolver;
+import org.apache.metron.stellar.dsl.StellarFunctionInfo;
+import org.apache.metron.stellar.dsl.StellarFunctions;
+import org.apache.metron.stellar.dsl.VariableResolver;
+import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
+import org.apache.metron.stellar.common.StellarProcessor;
+import org.apache.metron.stellar.common.utils.JSONUtils;
+import org.jboss.aesh.console.Console;
+
+import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.SortedMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static 
org.apache.metron.stellar.common.configuration.ConfigurationsUtils.readGlobalConfigBytesFromZookeeper;
+import static 
org.apache.metron.stellar.common.shell.StellarExecutor.OperationType.DOC;
+import static 
org.apache.metron.stellar.common.shell.StellarExecutor.OperationType.NORMAL;
+
+import static org.apache.metron.stellar.dsl.Context.Capabilities.*;
+
+/**
+ * Executes Stellar expressions and maintains state across multiple 
invocations.
+ */
+public class StellarExecutor {
+
+  public static String SHELL_VARIABLES = "shellVariables";
+  public static String CONSOLE = "console";
+
+  private ReadWriteLock indexLock = new ReentrantReadWriteLock();
+
+  public static class VariableResult {
+    private String expression;
+    private Object result;
+
+    public VariableResult(String expression, Object result) {
+      this.expression = expression;
+      this.result = result;
+    }
+
+    public String getExpression() {
+      return expression;
+    }
+
+    public Object getResult() {
+      return result;
+    }
+
+    @Override
+    public String toString() {
+      String ret = "" + result;
+      if(expression != null) {
+        ret += " via " + expression;
+      }
+      return ret;
+    }
+  }
+
+  /**
+   * prefix tree index of autocompletes
+   */
+  private PatriciaTrie<AutoCompleteType> autocompleteIndex;
+  /**
+   * The variables known by Stellar.
+   */
+  private Map<String, VariableResult> variables;
+
+  /**
+   * The function resolver.
+   */
+  private FunctionResolver functionResolver;
+
+  /**
+   * A Zookeeper client. Only defined if given a valid Zookeeper URL.
+   */
+  private Optional<CuratorFramework> client;
+
+  /**
+   * The Stellar execution context.
+   */
+  private Context context;
+
+  private Console console;
+
+  public enum OperationType {
+    DOC,MAGIC,NORMAL;
+  }
+
+  public interface AutoCompleteTransformation {
+    String transform(OperationType type, String key);
+  }
+
+  public enum AutoCompleteType implements AutoCompleteTransformation{
+      FUNCTION((type, key) -> {
+        if(type == DOC) {
+          return StellarShell.DOC_PREFIX + key;
+        }
+        else if(type == NORMAL) {
+          return key + "(";
+        }
+        return key;
+      })
+    , VARIABLE((type, key) -> key )
+    , TOKEN((type, key) -> key)
+    ;
+    AutoCompleteTransformation transform;
+    AutoCompleteType(AutoCompleteTransformation transform) {
+      this.transform = transform;
+    }
+
+    @Override
+    public String transform(OperationType type, String key) {
+      return transform.transform(type, key);
+    }
+
+  }
+
+  /**
+   * @param console The console used to drive the REPL.
+   * @param properties The Stellar properties.
+   * @throws Exception
+   */
+  public StellarExecutor(Console console, Properties properties) throws 
Exception {
+    this(null, console, properties);
+  }
+
+  /**
+   * @param console The console used to drive the REPL.
+   * @param properties The Stellar properties.
+   * @throws Exception
+   */
+  public StellarExecutor(String zookeeperUrl, Console console, Properties 
properties) throws Exception {
+    this.variables = new HashMap<>();
+    this.client = createClient(zookeeperUrl);
+    this.context = createContext(properties);
+
+    // initialize the default function resolver
+    StellarFunctions.initialize(this.context);
+    this.functionResolver = StellarFunctions.FUNCTION_RESOLVER();
+
+    this.autocompleteIndex = initializeIndex();
+    this.console = console;
+
+    // asynchronously update the index with function names found from a 
classpath scan.
+    new Thread( () -> {
+        Iterable<StellarFunctionInfo> functions = 
functionResolver.getFunctionInfo();
+        indexLock.writeLock().lock();
+        try {
+          for(StellarFunctionInfo info: functions) {
+            String functionName = info.getName();
+            autocompleteIndex.put(functionName, AutoCompleteType.FUNCTION);
+          }
+        }
+          finally {
+            System.out.println("Functions loaded, you may refer to functions 
now...");
+            indexLock.writeLock().unlock();
+          }
+    }).start();
+  }
+
+  private PatriciaTrie<AutoCompleteType> initializeIndex() {
+    Map<String, AutoCompleteType> index = new HashMap<>();
+
+    index.put("==", AutoCompleteType.TOKEN);
+    index.put(">=", AutoCompleteType.TOKEN);
+    index.put("<=", AutoCompleteType.TOKEN);
+    index.put(":=", AutoCompleteType.TOKEN);
+    index.put("quit", AutoCompleteType.TOKEN);
+    index.put(StellarShell.MAGIC_FUNCTIONS, AutoCompleteType.FUNCTION);
+    index.put(StellarShell.MAGIC_VARS, AutoCompleteType.FUNCTION);
+    return new PatriciaTrie<>(index);
+  }
+
+  public Iterable<String> autoComplete(String buffer, final OperationType 
opType) {
+    indexLock.readLock().lock();
+    try {
+      SortedMap<String, AutoCompleteType> ret = 
autocompleteIndex.prefixMap(buffer);
+      if (ret.isEmpty()) {
+        return new ArrayList<>();
+      }
+      return Iterables.transform(ret.entrySet(), kv -> 
kv.getValue().transform(opType, kv.getKey()));
+    }
+    finally {
+      indexLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Creates a Zookeeper client.
+   * @param zookeeperUrl The Zookeeper URL.
+   */
+  private Optional<CuratorFramework> createClient(String zookeeperUrl) {
+
+    // can only create client, if have valid zookeeper URL
+    if(StringUtils.isNotBlank(zookeeperUrl)) {
+      CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl);
+      client.start();
+      return Optional.of(client);
+
+    } else {
+      return Optional.empty();
+    }
+  }
+
+  /**
+   * Creates a Context initialized with configuration stored in Zookeeper.
+   */
+  private Context createContext(Properties properties) throws Exception {
+
+    Context.Builder contextBuilder = new Context.Builder()
+            .with(SHELL_VARIABLES, () -> variables)
+            .with(CONSOLE, () -> console)
+            .with(STELLAR_CONFIG, () -> properties);
+
+    // load global configuration from zookeeper
+    if (client.isPresent()) {
+
+      // fetch the global configuration
+      Map<String, Object> global = JSONUtils.INSTANCE.load(
+              new 
ByteArrayInputStream(readGlobalConfigBytesFromZookeeper(client.get())),
+              new TypeReference<Map<String, Object>>() {});
+
+      contextBuilder
+              .with(GLOBAL_CONFIG, () -> global)
+              .with(ZOOKEEPER_CLIENT, () -> client.get())
+              .with(STELLAR_CONFIG, () -> getStellarConfig(global, 
properties));
+    }
+
+    return contextBuilder.build();
+  }
+
+  private Map<String, Object> getStellarConfig(Map<String, Object> 
globalConfig, Properties props) {
+    Map<String, Object> ret = new HashMap<>();
+    ret.putAll(globalConfig);
+    if(props != null) {
+      for (Map.Entry<Object, Object> kv : props.entrySet()) {
+        ret.put(kv.getKey().toString(), kv.getValue());
+      }
+    }
+    return ret;
+  }
+
+  /**
+   * Executes the Stellar expression and returns the result.
+   * @param expression The Stellar expression to execute.
+   * @return The result of the expression.
+   */
+  public Object execute(String expression) {
+    VariableResolver variableResolver = new 
MapVariableResolver(Maps.transformValues(variables, result -> 
result.getResult())
+                                                               , 
Collections.emptyMap());
+    StellarProcessor processor = new StellarProcessor();
+    return processor.parse(expression, variableResolver, functionResolver, 
context);
+  }
+
+  /**
+   * Assigns a value to a variable.
+   * @param variable The name of the variable.
+   * @param value The value of the variable
+   */
+  public void assign(String variable, String expression, Object value) {
+    this.variables.put(variable, new VariableResult(expression, value));
+    indexLock.writeLock().lock();
+    try {
+      if (value != null) {
+        this.autocompleteIndex.put(variable, AutoCompleteType.VARIABLE);
+      } else {
+        this.autocompleteIndex.remove(variable);
+      }
+    }
+    finally {
+      indexLock.writeLock().unlock();
+    }
+  }
+
+  public Map<String, VariableResult> getVariables() {
+    return this.variables;
+  }
+
+  public FunctionResolver getFunctionResolver() {
+    return functionResolver;
+  }
+
+  public Context getContext() {
+    return context;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/StellarShell.java
----------------------------------------------------------------------
diff --git 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/StellarShell.java
 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/StellarShell.java
new file mode 100644
index 0000000..1b51415
--- /dev/null
+++ 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/StellarShell.java
@@ -0,0 +1,440 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.stellar.common.shell;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.StellarFunctionInfo;
+import org.apache.metron.stellar.common.StellarAssignment;
+import org.apache.metron.stellar.common.utils.JSONUtils;
+import org.jboss.aesh.complete.CompleteOperation;
+import org.jboss.aesh.complete.Completion;
+import org.jboss.aesh.console.AeshConsoleCallback;
+import org.jboss.aesh.console.Console;
+import org.jboss.aesh.console.ConsoleOperation;
+import org.jboss.aesh.console.Prompt;
+import org.jboss.aesh.console.settings.SettingsBuilder;
+import org.jboss.aesh.terminal.CharacterType;
+import org.jboss.aesh.terminal.Color;
+import org.jboss.aesh.terminal.TerminalCharacter;
+import org.jboss.aesh.terminal.TerminalColor;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * A REPL environment for Stellar.
+ *
+ * Useful for debugging Stellar expressions.
+ */
+public class StellarShell extends AeshConsoleCallback implements Completion {
+
+  private static final String WELCOME = "Stellar, Go!\n" +
+          "Please note that functions are loading lazily in the background and 
will be unavailable until loaded fully.";
+  private List<TerminalCharacter> EXPRESSION_PROMPT = new 
ArrayList<TerminalCharacter>()
+  {{
+    add(new TerminalCharacter('[', new TerminalColor(Color.RED, 
Color.DEFAULT)));
+    add(new TerminalCharacter('S', new TerminalColor(Color.GREEN, 
Color.DEFAULT), CharacterType.BOLD));
+    add(new TerminalCharacter('t', new TerminalColor(Color.GREEN, 
Color.DEFAULT), CharacterType.BOLD));
+    add(new TerminalCharacter('e', new TerminalColor(Color.GREEN, 
Color.DEFAULT), CharacterType.BOLD));
+    add(new TerminalCharacter('l', new TerminalColor(Color.GREEN, 
Color.DEFAULT), CharacterType.BOLD));
+    add(new TerminalCharacter('l', new TerminalColor(Color.GREEN, 
Color.DEFAULT), CharacterType.BOLD));
+    add(new TerminalCharacter('a', new TerminalColor(Color.GREEN, 
Color.DEFAULT), CharacterType.BOLD));
+    add(new TerminalCharacter('r', new TerminalColor(Color.GREEN, 
Color.DEFAULT), CharacterType.BOLD));
+    add(new TerminalCharacter(']', new TerminalColor(Color.RED, 
Color.DEFAULT)));
+    add(new TerminalCharacter('>', new TerminalColor(Color.GREEN, 
Color.DEFAULT), CharacterType.UNDERLINE));
+    add(new TerminalCharacter('>', new TerminalColor(Color.GREEN, 
Color.DEFAULT), CharacterType.UNDERLINE));
+    add(new TerminalCharacter('>', new TerminalColor(Color.GREEN, 
Color.DEFAULT), CharacterType.UNDERLINE));
+    add(new TerminalCharacter(' ', new TerminalColor(Color.DEFAULT, 
Color.DEFAULT)));
+  }};
+
+  public static final String ERROR_PROMPT = "[!] ";
+  public static final String MAGIC_PREFIX = "%";
+  public static final String MAGIC_FUNCTIONS = MAGIC_PREFIX + "functions";
+  public static final String MAGIC_VARS = MAGIC_PREFIX + "vars";
+  public static final String DOC_PREFIX = "?";
+  public static final String STELLAR_PROPERTIES_FILENAME = 
"stellar.properties";
+
+  private StellarExecutor executor;
+
+  private Console console;
+
+  /**
+   * Execute the Stellar REPL.
+   */
+  public static void main(String[] args) throws Exception {
+    StellarShell shell = new StellarShell(args);
+    shell.execute();
+  }
+
+  /**
+   * Create a Stellar REPL.
+   * @param args The commmand-line arguments.
+   */
+  public StellarShell(String[] args) throws Exception {
+
+    // define valid command-line options
+    Options options = new Options();
+    options.addOption("z", "zookeeper", true, "Zookeeper URL");
+    options.addOption("v", "variables", true, "File containing a JSON Map of 
variables");
+    options.addOption("irc", "inputrc", true, "File containing the inputrc if 
not the default ~/.inputrc");
+    options.addOption("na", "no_ansi", false, "Make the input prompt not use 
ANSI colors.");
+    options.addOption("h", "help", false, "Print help");
+    options.addOption("p", "properties", true, "File containing Stellar 
properties");
+
+    CommandLineParser parser = new PosixParser();
+    CommandLine commandLine = parser.parse(options, args);
+
+    // print help
+    if(commandLine.hasOption("h")) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("stellar", options);
+      System.exit(0);
+    }
+
+    console = createConsole(commandLine);
+    executor = createExecutor(commandLine, console, 
getStellarProperties(commandLine));
+    loadVariables(commandLine, executor);
+    console.setPrompt(new Prompt(EXPRESSION_PROMPT));
+    console.addCompletion(this);
+    console.setConsoleCallback(this);
+  }
+
+  /**
+   * Loads any variables defined in an external file.
+   * @param commandLine The command line arguments.
+   * @param executor The stellar executor.
+   * @throws IOException
+   */
+  private static void loadVariables(CommandLine commandLine, StellarExecutor 
executor) throws IOException {
+    if(commandLine.hasOption("v")) {
+
+      Map<String, Object> variables = JSONUtils.INSTANCE.load(
+              new File(commandLine.getOptionValue("v")),
+              new TypeReference<Map<String, Object>>() {});
+
+      for(Map.Entry<String, Object> kv : variables.entrySet()) {
+        executor.assign(kv.getKey(), null, kv.getValue());
+      }
+    }
+  }
+
+  /**
+   * Creates the Stellar execution environment.
+   * @param commandLine The command line arguments.
+   * @param console The console which drives the REPL.
+   * @param properties Stellar properties.
+   */
+  private static StellarExecutor createExecutor(CommandLine commandLine, 
Console console, Properties properties) throws Exception {
+    StellarExecutor executor;
+
+    // create the executor
+    if(commandLine.hasOption("z")) {
+      String zookeeperUrl = commandLine.getOptionValue("z");
+      executor = new StellarExecutor(zookeeperUrl, console, properties);
+
+    } else {
+      executor = new StellarExecutor(console, properties);
+    }
+
+    return executor;
+  }
+
+  /**
+   * Creates the REPL's console.
+   * @param commandLine The command line options.
+   */
+  private Console createConsole(CommandLine commandLine) {
+
+    // console settings
+    boolean useAnsi = !commandLine.hasOption("na");
+    SettingsBuilder settings = new SettingsBuilder().enableAlias(true)
+                                                    .enableMan(true)
+                                                    .ansi(useAnsi)
+                                                    .parseOperators(false)
+                                                    
.inputStream(PausableInput.INSTANCE);
+
+    if(commandLine.hasOption("irc")) {
+      settings = settings.inputrc(new File(commandLine.getOptionValue("irc")));
+    }
+
+    return new Console(settings.create());
+  }
+
+  /**
+   * Retrieves the Stellar properties. The properties are either loaded from a 
file in
+   * the classpath or a set of defaults are used.
+   */
+  private Properties getStellarProperties(CommandLine commandLine) throws 
IOException {
+    Properties properties = new Properties();
+
+    if (commandLine.hasOption("p")) {
+
+      // first attempt to load properties from a file specified on the 
command-line
+      try (InputStream in = new 
FileInputStream(commandLine.getOptionValue("p"))) {
+        if(in != null) {
+          properties.load(in);
+        }
+      }
+
+    } else {
+
+      // otherwise attempt to load properties from the classpath
+      try (InputStream in = 
getClass().getClassLoader().getResourceAsStream(STELLAR_PROPERTIES_FILENAME)) {
+        if(in != null) {
+          properties.load(in);
+        }
+      }
+    }
+
+    return properties;
+  }
+
+  /**
+   * Handles the main loop for the REPL.
+   */
+  public void execute() {
+
+    // welcome message and print globals
+    writeLine(WELCOME);
+    executor.getContext()
+            .getCapability(Context.Capabilities.GLOBAL_CONFIG, false)
+            .ifPresent(conf -> writeLine(conf.toString()));
+
+    console.start();
+  }
+
+  /**
+   * Handles user interaction when executing a Stellar expression.
+   * @param expression The expression to execute.
+   */
+  private void handleStellar(String expression) {
+
+    String stellarExpression = expression;
+    String variable = null;
+    if(StellarAssignment.isAssignment(expression)) {
+      StellarAssignment expr = StellarAssignment.from(expression);
+      variable = expr.getVariable();
+      stellarExpression = expr.getStatement();
+    }
+    else {
+      if (!stellarExpression.isEmpty()) {
+        stellarExpression = stellarExpression.trim();
+      }
+    }
+    Object result = executeStellar(stellarExpression);
+    if(result != null && variable == null) {
+      writeLine(result.toString());
+    }
+    if(variable != null) {
+      executor.assign(variable, stellarExpression, result);
+    }
+  }
+
+  /**
+   * Handles user interaction when executing a Magic command.
+   * @param rawExpression The expression to execute.
+   */
+  private void handleMagic( String rawExpression) {
+    String expression = rawExpression.trim();
+    if(MAGIC_FUNCTIONS.equals(expression)) {
+
+      // list all functions
+      String functions = StreamSupport
+              
.stream(executor.getFunctionResolver().getFunctionInfo().spliterator(), false)
+              .map(info -> String.format("%s", info.getName()))
+              .sorted()
+              .collect(Collectors.joining(", "));
+      writeLine(functions);
+
+    } else if(MAGIC_VARS.equals(expression)) {
+
+      // list all variables
+
+      executor.getVariables()
+              .forEach((k,v) -> writeLine(String.format("%s = %s", k, v)));
+
+    } else {
+      writeLine(ERROR_PROMPT + "undefined magic command: " + expression);
+    }
+  }
+
+  /**
+   * Handles user interaction when executing a doc command.
+   * @param expression The expression to execute.
+   */
+  private void handleDoc(String expression) {
+
+    String functionName = StringUtils.substring(expression, 1);
+    StreamSupport
+            
.stream(executor.getFunctionResolver().getFunctionInfo().spliterator(), false)
+            .filter(info -> StringUtils.equals(functionName, info.getName()))
+            .map(info -> format(info))
+            .forEach(doc -> write(doc));
+  }
+
+  /**
+   * Formats the Stellar function info object into a readable string.
+   * @param info The stellar function info object.
+   * @return A readable string.
+   */
+  private String format(StellarFunctionInfo info) {
+    StringBuffer ret = new StringBuffer();
+    ret.append(info.getName() + "\n");
+    ret.append(String.format("Description: %-60s\n\n", info.getDescription()));
+    if(info.getParams().length > 0) {
+      ret.append("Arguments:\n");
+      for(String param : info.getParams()) {
+        ret.append(String.format("\t%-60s\n", param));
+      }
+      ret.append("\n");
+    }
+    ret.append(String.format("Returns: %-60s\n", info.getReturns()));
+
+    return ret.toString();
+  }
+
+  /**
+   * Is a given expression a built-in magic?
+   * @param expression The expression.
+   */
+  private boolean isMagic(String expression) {
+    return StringUtils.startsWith(expression, MAGIC_PREFIX);
+  }
+
+  /**
+   * Is a given expression asking for function documentation?
+   * @param expression The expression.
+   */
+  private boolean isDoc(String expression) {
+    return StringUtils.startsWith(expression, DOC_PREFIX);
+  }
+
+  /**
+   * Executes a Stellar expression.
+   * @param expression The expression to execute.
+   * @return The result of the expression.
+   */
+  private Object executeStellar(String expression) {
+    Object result = null;
+
+    try {
+      result = executor.execute(expression);
+
+    } catch(Throwable t) {
+      writeLine(ERROR_PROMPT + t.getMessage());
+      t.printStackTrace();
+    }
+
+    return result;
+  }
+
+  private void write(String out) {
+    System.out.print(out);
+  }
+
+  private void writeLine(String out) {
+    console.getShell().out().println(out);
+  }
+
+  @Override
+  public int execute(ConsoleOperation output) throws InterruptedException {
+    String expression = output.getBuffer().trim();
+    if(StringUtils.isNotBlank(expression) ) {
+      if(isMagic(expression)) {
+        handleMagic( expression);
+
+      } else if(isDoc(expression)) {
+        handleDoc(expression);
+
+      } else if (expression.equals("quit")) {
+        try {
+          console.stop();
+        } catch (Throwable e) {
+          e.printStackTrace();
+        }
+      }
+      else if(expression.charAt(0) == '#') {
+        return 0;
+      }
+      else {
+        handleStellar(expression);
+      }
+    }
+
+    return 0;
+  }
+
+  @Override
+  public void complete(CompleteOperation completeOperation) {
+    if(!completeOperation.getBuffer().isEmpty()) {
+      String lastToken = Iterables.getLast(Splitter.on(" 
").split(completeOperation.getBuffer()), null);
+      if(lastToken != null && !lastToken.isEmpty()) {
+        lastToken = lastToken.trim();
+        final String lastBit = lastToken;
+        final boolean isDocRequest = isDoc(lastToken);
+        if(isDocRequest) {
+          lastToken = lastToken.substring(1);
+        }
+        StellarExecutor.OperationType opType = 
StellarExecutor.OperationType.NORMAL;
+        if(isDocRequest) {
+          opType = StellarExecutor.OperationType.DOC;
+        }
+        else if(isMagic(lastToken)) {
+          opType = StellarExecutor.OperationType.MAGIC;
+        }
+        Iterable<String> candidates = executor.autoComplete(lastToken, opType);
+        if(candidates != null && !Iterables.isEmpty(candidates)) {
+          completeOperation.setCompletionCandidates( Lists.newArrayList(
+                  Iterables.transform(candidates, s -> 
stripOff(completeOperation.getBuffer(), lastBit) + s )
+                  )
+          );
+        }
+      }
+    }
+
+  }
+
+  private static String stripOff(String baseString, String lastBit) {
+    int index = baseString.lastIndexOf(lastBit);
+    if(index < 0) {
+      return baseString;
+    }
+    return baseString.substring(0, index);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/system/Clock.java
----------------------------------------------------------------------
diff --git 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/system/Clock.java
 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/system/Clock.java
new file mode 100644
index 0000000..66c9e9d
--- /dev/null
+++ 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/system/Clock.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.stellar.common.system;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+public class Clock {
+  private final static String UTC = "UTC";
+
+  public long currentTimeMillis() {
+    return System.currentTimeMillis();
+  }
+
+  public String currentTimeFormatted(String stdDateFormat) {
+    SimpleDateFormat format = new SimpleDateFormat(stdDateFormat);
+    format.setTimeZone(TimeZone.getTimeZone(UTC));
+    return format.format(new Date(currentTimeMillis()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/system/Environment.java
----------------------------------------------------------------------
diff --git 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/system/Environment.java
 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/system/Environment.java
new file mode 100644
index 0000000..2ad4b19
--- /dev/null
+++ 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/system/Environment.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.stellar.common.system;
+
+/**
+ * Useful so we can test mock dependency injection with environment variables
+ */
+public class Environment {
+  public String get(String variable) {
+    return System.getenv().get(variable);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/BloomFilter.java
----------------------------------------------------------------------
diff --git 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/BloomFilter.java
 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/BloomFilter.java
new file mode 100644
index 0000000..445dca5
--- /dev/null
+++ 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/BloomFilter.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.stellar.common.utils;
+
+import com.google.common.hash.Funnel;
+import com.google.common.hash.PrimitiveSink;
+
+import java.io.Serializable;
+import java.util.function.Function;
+
+public class BloomFilter<T> implements Serializable {
+
+  private static class BloomFunnel<T> implements Funnel<T>, Serializable {
+    Function<T, byte[]> serializer;
+    public BloomFunnel(Function<T, byte[]> serializer) {
+      this.serializer = serializer;
+    }
+    @Override
+    public void funnel(T obj, PrimitiveSink primitiveSink) {
+      primitiveSink.putBytes(serializer.apply(obj));
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return this.getClass().equals(obj.getClass());
+    }
+
+    @Override
+    public int hashCode() {
+      return super.hashCode() * 31;
+    }
+  }
+
+  public static class DefaultSerializer<T> implements Function<T, byte[]> {
+    @Override
+    public byte[] apply(T t) {
+      return SerDeUtils.toBytes(t);
+    }
+  }
+  private com.google.common.hash.BloomFilter<T> filter;
+
+  public BloomFilter(Function<T, byte[]> serializer, int expectedInsertions, 
double falsePositiveRate) {
+    filter = com.google.common.hash.BloomFilter.create(new 
BloomFunnel<T>(serializer), expectedInsertions, falsePositiveRate);
+  }
+
+  public boolean mightContain(T key) {
+    return filter.mightContain(key);
+  }
+  public void add(T key) {
+    filter.put(key);
+  }
+  public void merge(BloomFilter<T> filter2) {
+    filter.putAll(filter2.filter);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    BloomFilter<?> that = (BloomFilter<?>) o;
+
+    return filter != null ? filter.equals(that.filter) : that.filter == null;
+
+  }
+
+  @Override
+  public int hashCode() {
+    return filter != null ? filter.hashCode() : 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/ConversionUtils.java
----------------------------------------------------------------------
diff --git 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/ConversionUtils.java
 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/ConversionUtils.java
new file mode 100644
index 0000000..b53097f
--- /dev/null
+++ 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/ConversionUtils.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.stellar.common.utils;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.beanutils.BeanUtilsBean2;
+import org.apache.commons.beanutils.ConvertUtilsBean;
+
+import java.util.List;
+
+public class ConversionUtils {
+  private static ThreadLocal<ConvertUtilsBean> UTILS_BEAN = new 
ThreadLocal<ConvertUtilsBean>() {
+    @Override
+    protected ConvertUtilsBean initialValue() {
+      ConvertUtilsBean ret = BeanUtilsBean2.getInstance().getConvertUtils();
+      ret.deregister();
+      ret.register(false, true, 1);
+      return ret;
+    }
+  };
+
+  public static <T> T convert(Object o, Class<T> clazz) {
+    if (o == null) {
+      return null;
+    }
+    return clazz.cast(UTILS_BEAN.get().convert(o, clazz));
+  }
+
+  /**
+   * Performs naive List type conversion.
+   *
+   * @param from Source list
+   * @param clazz Class type to cast the List elements to
+   * @param <T> Source element type
+   * @param <U> Desired element type
+   * @return New List with the elements cast to the desired type
+   */
+  public static <T, U> List<U> convertList(List<T> from, Class<U> clazz) {
+    return Lists.transform(from, s -> convert(s, clazz));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/JSONUtils.java
----------------------------------------------------------------------
diff --git 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/JSONUtils.java
 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/JSONUtils.java
new file mode 100644
index 0000000..d99263b
--- /dev/null
+++ 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/JSONUtils.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.stellar.common.utils;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+
+import java.io.*;
+
+public enum JSONUtils {
+  INSTANCE;
+
+  private static ThreadLocal<JSONParser> _parser = ThreadLocal.withInitial(() 
->
+          new JSONParser());
+
+  private static ThreadLocal<ObjectMapper> _mapper = 
ThreadLocal.withInitial(() ->
+          new 
ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL));
+
+  public <T> T load(InputStream is, TypeReference<T> ref) throws IOException {
+    return _mapper.get().readValue(is, ref);
+  }
+
+  public <T> T load(String is, TypeReference<T> ref) throws IOException {
+    return _mapper.get().readValue(is, ref);
+  }
+
+  public <T> T load(File f, TypeReference<T> ref) throws IOException {
+    try (InputStream is = new BufferedInputStream(new FileInputStream(f))) {
+      return _mapper.get().readValue(is, ref);
+    }
+  }
+
+  public <T> T load(InputStream is, Class<T> clazz) throws IOException {
+    return _mapper.get().readValue(is, clazz);
+  }
+
+  public <T> T load(File f, Class<T> clazz) throws IOException {
+    try (InputStream is = new BufferedInputStream(new FileInputStream(f))) {
+      return _mapper.get().readValue(is, clazz);
+    }
+  }
+
+  public <T> T load(String is, Class<T> clazz) throws IOException {
+    return _mapper.get().readValue(is, clazz);
+  }
+
+  public String toJSON(Object o, boolean pretty) throws 
JsonProcessingException {
+    if (pretty) {
+      return 
_mapper.get().writerWithDefaultPrettyPrinter().writeValueAsString(o);
+    } else {
+      return _mapper.get().writeValueAsString(o);
+    }
+  }
+
+  public byte[] toJSON(Object config) throws JsonProcessingException {
+    return _mapper.get().writeValueAsBytes(config);
+  }
+
+  /**
+   * Transforms a bean (aka POJO) to a JSONObject.
+   */
+  public JSONObject toJSONObject(Object o) throws JsonProcessingException, 
ParseException {
+    return (JSONObject) _parser.get().parse(toJSON(o, false));
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/SerDeUtils.java
----------------------------------------------------------------------
diff --git 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/SerDeUtils.java
 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/SerDeUtils.java
new file mode 100644
index 0000000..7bd9520
--- /dev/null
+++ 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/SerDeUtils.java
@@ -0,0 +1,256 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.stellar.common.utils;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.util.Util;
+import com.esotericsoftware.reflectasm.ConstructorAccess;
+import de.javakaffee.kryoserializers.*;
+import de.javakaffee.kryoserializers.cglib.CGLibProxySerializer;
+import de.javakaffee.kryoserializers.guava.ImmutableListSerializer;
+import de.javakaffee.kryoserializers.guava.ImmutableMapSerializer;
+import de.javakaffee.kryoserializers.guava.ImmutableMultimapSerializer;
+import de.javakaffee.kryoserializers.guava.ImmutableSetSerializer;
+import de.javakaffee.kryoserializers.jodatime.JodaLocalDateSerializer;
+import de.javakaffee.kryoserializers.jodatime.JodaLocalDateTimeSerializer;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.objenesis.instantiator.ObjectInstantiator;
+import org.objenesis.strategy.InstantiatorStrategy;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Modifier;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.*;
+import java.util.function.Function;
+
+import static com.esotericsoftware.kryo.util.Util.className;
+
+/**
+ * Provides basic functionality to serialize and deserialize the allowed
+ * value types for a ProfileMeasurement.
+ */
+public class SerDeUtils {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(SerDeUtils.class);
+  private static ThreadLocal<Kryo> kryo = new ThreadLocal<Kryo>() {
+    @Override
+    protected Kryo initialValue() {
+      Kryo ret = new Kryo();
+      ret.setReferences(true);
+      ret.setInstantiatorStrategy(new DefaultInstantiatorStrategy(new 
StdInstantiatorStrategy()));
+
+      ret.register(Arrays.asList("").getClass(), new ArraysAsListSerializer());
+      ret.register(Collections.EMPTY_LIST.getClass(), new 
CollectionsEmptyListSerializer());
+      ret.register(Collections.EMPTY_MAP.getClass(), new 
CollectionsEmptyMapSerializer());
+      ret.register(Collections.EMPTY_SET.getClass(), new 
CollectionsEmptySetSerializer());
+      ret.register(Collections.singletonList("").getClass(), new 
CollectionsSingletonListSerializer());
+      ret.register(Collections.singleton("").getClass(), new 
CollectionsSingletonSetSerializer());
+      ret.register(Collections.singletonMap("", "").getClass(), new 
CollectionsSingletonMapSerializer());
+      ret.register(GregorianCalendar.class, new GregorianCalendarSerializer());
+      ret.register(InvocationHandler.class, new JdkProxySerializer());
+      UnmodifiableCollectionsSerializer.registerSerializers(ret);
+      SynchronizedCollectionsSerializer.registerSerializers(ret);
+
+// custom serializers for non-jdk libs
+
+// register CGLibProxySerializer, works in combination with the appropriate 
action in handleUnregisteredClass (see below)
+      ret.register(CGLibProxySerializer.CGLibProxyMarker.class, new 
CGLibProxySerializer());
+// joda DateTime, LocalDate and LocalDateTime
+      ret.register(LocalDate.class, new JodaLocalDateSerializer());
+      ret.register(LocalDateTime.class, new JodaLocalDateTimeSerializer());
+// guava ImmutableList, ImmutableSet, ImmutableMap, ImmutableMultimap, 
UnmodifiableNavigableSet
+      ImmutableListSerializer.registerSerializers(ret);
+      ImmutableSetSerializer.registerSerializers(ret);
+      ImmutableMapSerializer.registerSerializers(ret);
+      ImmutableMultimapSerializer.registerSerializers(ret);
+      return ret;
+    }
+  };
+
+  /**
+   * This was backported from a more recent version of kryo than we currently 
run.  The reason why it exists is
+   * that we want a strategy for instantiation of classes which attempts a 
no-arg constructor first and THEN falls
+   * back to reflection for performance reasons alone (this is, after all, in 
the critical path).
+   *
+   */
+  static private class DefaultInstantiatorStrategy implements 
org.objenesis.strategy.InstantiatorStrategy {
+    private InstantiatorStrategy fallbackStrategy;
+
+    public DefaultInstantiatorStrategy () {
+    }
+
+    public DefaultInstantiatorStrategy (InstantiatorStrategy fallbackStrategy) 
{
+      this.fallbackStrategy = fallbackStrategy;
+    }
+
+    public void setFallbackInstantiatorStrategy (final InstantiatorStrategy 
fallbackStrategy) {
+      this.fallbackStrategy = fallbackStrategy;
+    }
+
+    public InstantiatorStrategy getFallbackInstantiatorStrategy () {
+      return fallbackStrategy;
+    }
+
+    @Override
+    public ObjectInstantiator newInstantiatorOf (final Class type) {
+      if (!Util.isAndroid) {
+        // Use ReflectASM if the class is not a non-static member class.
+        Class enclosingType = type.getEnclosingClass();
+        boolean isNonStaticMemberClass = enclosingType != null && 
type.isMemberClass()
+                && !Modifier.isStatic(type.getModifiers());
+        if (!isNonStaticMemberClass) {
+          try {
+            final ConstructorAccess access = ConstructorAccess.get(type);
+            return new ObjectInstantiator() {
+              @Override
+              public Object newInstance () {
+                try {
+                  return access.newInstance();
+                } catch (Exception ex) {
+                  throw new KryoException("Error constructing instance of 
class: " + className(type), ex);
+                }
+              }
+            };
+          } catch (Exception ignored) {
+          }
+        }
+      }
+      // Reflection.
+      try {
+        Constructor ctor;
+        try {
+          ctor = type.getConstructor((Class[])null);
+        } catch (Exception ex) {
+          ctor = type.getDeclaredConstructor((Class[])null);
+          ctor.setAccessible(true);
+        }
+        final Constructor constructor = ctor;
+        return new ObjectInstantiator() {
+          @Override
+          public Object newInstance () {
+            try {
+              return constructor.newInstance();
+            } catch (Exception ex) {
+              throw new KryoException("Error constructing instance of class: " 
+ className(type), ex);
+            }
+          }
+        };
+      } catch (Exception ignored) {
+      }
+      if (fallbackStrategy == null) {
+        if (type.isMemberClass() && !Modifier.isStatic(type.getModifiers()))
+          throw new KryoException("Class cannot be created (non-static member 
class): " + className(type));
+        else
+          throw new KryoException("Class cannot be created (missing no-arg 
constructor): " + className(type));
+      }
+      // InstantiatorStrategy.
+      return fallbackStrategy.newInstantiatorOf(type);
+    }
+  }
+
+  public static Serializer SERIALIZER = new Serializer();
+
+  private static class Serializer implements Function<Object, byte[]> {
+    /**
+     * Serializes the given Object into bytes.
+     *
+     */
+    @Override
+    public byte[] apply(Object o) {
+      return toBytes(o);
+    }
+  }
+
+  public static class Deserializer<T> implements Function<byte[], T> {
+
+    private Class<T> clazz;
+    public Deserializer(Class<T> clazz) {
+      this.clazz = clazz;
+    }
+    /**
+     * Deserializes the given bytes.
+     *
+     * @param bytes the function argument
+     * @return the function result
+     */
+    @Override
+    public T apply(byte[] bytes) {
+      return fromBytes(bytes, clazz);
+    }
+  }
+
+
+  private SerDeUtils() {
+    // do not instantiate
+  }
+
+  /**
+   * Serialize a profile measurement's value.
+   *
+   * The value produced by a Profile definition can be any numeric data type.  
The data
+   * type depends on how the profile is defined by the user.  The user should 
be able to
+   * choose the data type that is most suitable for their use case.
+   *
+   * @param value The value to serialize.
+   */
+  public static byte[] toBytes(Object value) {
+    try {
+      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+      Output output = new Output(bos);
+      kryo.get().writeClassAndObject(output, value);
+      output.flush();
+      bos.flush();
+      return bos.toByteArray();
+    }
+    catch(Throwable t) {
+      LOG.error("Unable to serialize: " + value + " because " + 
t.getMessage(), t);
+      throw new IllegalStateException("Unable to serialize " + value + " 
because " + t.getMessage(), t);
+    }
+  }
+
+  /**
+   * Deserialize a profile measurement's value.
+   *
+   * The value produced by a Profile definition can be any numeric data type.  
The data
+   * type depends on how the profile is defined by the user.  The user should 
be able to
+   * choose the data type that is most suitable for their use case.
+   *
+   * @param value The value to deserialize.
+   */
+  public static <T> T fromBytes(byte[] value, Class<T> clazz) {
+    try {
+      Input input = new Input(new ByteArrayInputStream(value));
+      return clazz.cast(kryo.get().readClassAndObject(input));
+    }
+    catch(Throwable t) {
+      LOG.error("Unable to deserialize  because " + t.getMessage(), t);
+      throw t;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java
----------------------------------------------------------------------
diff --git 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java
 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java
new file mode 100644
index 0000000..cc19b5f
--- /dev/null
+++ 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.stellar.common.utils;
+
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.MapVariableResolver;
+import org.apache.metron.stellar.dsl.StellarFunctions;
+import org.apache.metron.stellar.dsl.VariableResolver;
+import com.google.common.collect.ImmutableList;
+import org.apache.metron.stellar.common.StellarPredicateProcessor;
+import org.apache.metron.stellar.common.StellarProcessor;
+import org.junit.Assert;
+
+import java.util.AbstractMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Spliterators;
+import java.util.function.Consumer;
+import java.util.function.IntConsumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+public class StellarProcessorUtils {
+
+    /**
+     * This utility class is intended for use while unit testing Stellar 
operators.
+     * It is included in the "main" code so third-party operators will not need
+     * a test dependency on Stellar's test-jar.
+     *
+     * This class ensures the basic contract of a stellar expression is 
adhered to:
+     * 1. Validate works on the expression
+     * 2. The output can be serialized and deserialized properly
+     *
+     * @param rule
+     * @param variables
+     * @param context
+     * @return
+     */
+    public static Object run(String rule, Map<String, Object> variables, 
Context context) {
+        StellarProcessor processor = new StellarProcessor();
+        Assert.assertTrue(rule + " not valid.", processor.validate(rule, 
context));
+        Object ret = processor.parse(rule, x -> variables.get(x), 
StellarFunctions.FUNCTION_RESOLVER(), context);
+        byte[] raw = SerDeUtils.toBytes(ret);
+        Object actual = SerDeUtils.fromBytes(raw, Object.class);
+        Assert.assertEquals(ret, actual);
+        return ret;
+    }
+
+  public static Object run(String rule, Map<String, Object> variables) {
+    return run(rule, variables, Context.EMPTY_CONTEXT());
+  }
+
+  public static boolean runPredicate(String rule, Map resolver) {
+    return runPredicate(rule, resolver, Context.EMPTY_CONTEXT());
+  }
+
+  public static boolean runPredicate(String rule, Map resolver, Context 
context) {
+    return runPredicate(rule, new MapVariableResolver(resolver), context);
+  }
+
+  public static boolean runPredicate(String rule, VariableResolver resolver) {
+    return runPredicate(rule, resolver, Context.EMPTY_CONTEXT());
+  }
+
+  public static boolean runPredicate(String rule, VariableResolver resolver, 
Context context) {
+    StellarPredicateProcessor processor = new StellarPredicateProcessor();
+    Assert.assertTrue(rule + " not valid.", processor.validate(rule));
+    return processor.parse(rule, resolver, 
StellarFunctions.FUNCTION_RESOLVER(), context);
+  }
+
+  public static void runWithArguments(String function, Object argument, Object 
expected) {
+    runWithArguments(function, ImmutableList.of(argument), expected);
+  }
+
+  public static void runWithArguments(String function, List<Object> arguments, 
Object expected) {
+    Supplier<Stream<Map.Entry<String, Object>>> kvStream = () -> 
StreamSupport.stream(new XRange(arguments.size()), false)
+            .map( i -> new AbstractMap.SimpleImmutableEntry<>("var" + i, 
arguments.get(i)));
+
+    String args = kvStream.get().map( kv -> kv.getKey())
+                                .collect(Collectors.joining(","));
+    Map<String, Object> variables = kvStream.get().collect(Collectors.toMap(kv 
-> kv.getKey(), kv -> kv.getValue()));
+    String stellarStatement =  function + "(" + args + ")";
+    String reason = stellarStatement + " != " + expected + " with variables: " 
+ variables;
+
+    if(expected instanceof Double) {
+      Assert.assertEquals(reason, (Double)expected, 
(Double)run(stellarStatement, variables), 1e-6);
+    }
+    else {
+      Assert.assertEquals(reason, expected, run(stellarStatement, variables));
+    }
+  }
+
+  public static class XRange extends Spliterators.AbstractIntSpliterator {
+    int end;
+    int i = 0;
+
+    public XRange(int start, int end) {
+      super(end - start, 0);
+      i = start;
+      this.end = end;
+    }
+
+    public XRange(int end) {
+      this(0, end);
+    }
+
+    @Override
+    public boolean tryAdvance(IntConsumer action) {
+      boolean isDone = i >= end;
+      if(isDone) {
+        return false;
+      }
+      else {
+        action.accept(i);
+        i++;
+        return true;
+      }
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * @param action
+     * @implSpec If the action is an instance of {@code IntConsumer} then it 
is cast
+     * to {@code IntConsumer} and passed to
+     * {@link #tryAdvance(IntConsumer)}; otherwise
+     * the action is adapted to an instance of {@code IntConsumer}, by
+     * boxing the argument of {@code IntConsumer}, and then passed to
+     * {@link #tryAdvance(IntConsumer)}.
+     */
+    @Override
+    public boolean tryAdvance(Consumer<? super Integer> action) {
+      boolean isDone = i >= end;
+      if(isDone) {
+        return false;
+      }
+      else {
+        action.accept(i);
+        i++;
+        return true;
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/VFSClassloaderUtil.java
----------------------------------------------------------------------
diff --git 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/VFSClassloaderUtil.java
 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/VFSClassloaderUtil.java
new file mode 100644
index 0000000..b70df11
--- /dev/null
+++ 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/VFSClassloaderUtil.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.stellar.common.utils;
+
+import org.apache.accumulo.start.classloader.vfs.UniqueFileReplicator;
+import org.apache.commons.vfs2.*;
+import org.apache.commons.vfs2.cache.SoftRefFilesCache;
+import org.apache.commons.vfs2.impl.DefaultFileSystemManager;
+import org.apache.commons.vfs2.impl.FileContentInfoFilenameFactory;
+import org.apache.commons.vfs2.impl.VFSClassLoader;
+import org.apache.commons.vfs2.provider.hdfs.HdfsFileProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.*;
+
+public class VFSClassloaderUtil {
+  private static final Logger LOG = 
LoggerFactory.getLogger(VFSClassloaderUtil.class);
+
+  /**
+   * Create a FileSystem manager suitable for our purposes.
+   * This manager supports files of the following types:
+   * * res - resource files
+   * * jar
+   * * tar
+   * * bz2
+   * * tgz
+   * * zip
+   * * HDFS
+   * * FTP
+   * * HTTP/S
+   * * file
+   * @return
+   * @throws FileSystemException
+   */
+  public static FileSystemManager generateVfs() throws FileSystemException {
+    DefaultFileSystemManager vfs = new DefaultFileSystemManager();
+    vfs.addProvider("res", new 
org.apache.commons.vfs2.provider.res.ResourceFileProvider());
+    vfs.addProvider("zip", new 
org.apache.commons.vfs2.provider.zip.ZipFileProvider());
+    vfs.addProvider("gz", new 
org.apache.commons.vfs2.provider.gzip.GzipFileProvider());
+    vfs.addProvider("ram", new 
org.apache.commons.vfs2.provider.ram.RamFileProvider());
+    vfs.addProvider("file", new 
org.apache.commons.vfs2.provider.local.DefaultLocalFileProvider());
+    vfs.addProvider("jar", new 
org.apache.commons.vfs2.provider.jar.JarFileProvider());
+    vfs.addProvider("http", new 
org.apache.commons.vfs2.provider.http.HttpFileProvider());
+    vfs.addProvider("https", new 
org.apache.commons.vfs2.provider.https.HttpsFileProvider());
+    vfs.addProvider("ftp", new 
org.apache.commons.vfs2.provider.ftp.FtpFileProvider());
+    vfs.addProvider("ftps", new 
org.apache.commons.vfs2.provider.ftps.FtpsFileProvider());
+    vfs.addProvider("war", new 
org.apache.commons.vfs2.provider.jar.JarFileProvider());
+    vfs.addProvider("par", new 
org.apache.commons.vfs2.provider.jar.JarFileProvider());
+    vfs.addProvider("ear", new 
org.apache.commons.vfs2.provider.jar.JarFileProvider());
+    vfs.addProvider("sar", new 
org.apache.commons.vfs2.provider.jar.JarFileProvider());
+    vfs.addProvider("ejb3", new 
org.apache.commons.vfs2.provider.jar.JarFileProvider());
+    vfs.addProvider("tmp", new 
org.apache.commons.vfs2.provider.temp.TemporaryFileProvider());
+    vfs.addProvider("tar", new 
org.apache.commons.vfs2.provider.tar.TarFileProvider());
+    vfs.addProvider("tbz2", new 
org.apache.commons.vfs2.provider.tar.TarFileProvider());
+    vfs.addProvider("tgz", new 
org.apache.commons.vfs2.provider.tar.TarFileProvider());
+    vfs.addProvider("bz2", new 
org.apache.commons.vfs2.provider.bzip2.Bzip2FileProvider());
+    vfs.addProvider("hdfs", new HdfsFileProvider());
+    vfs.addExtensionMap("jar", "jar");
+    vfs.addExtensionMap("zip", "zip");
+    vfs.addExtensionMap("gz", "gz");
+    vfs.addExtensionMap("tar", "tar");
+    vfs.addExtensionMap("tbz2", "tar");
+    vfs.addExtensionMap("tgz", "tar");
+    vfs.addExtensionMap("bz2", "bz2");
+    vfs.addMimeTypeMap("application/x-tar", "tar");
+    vfs.addMimeTypeMap("application/x-gzip", "gz");
+    vfs.addMimeTypeMap("application/zip", "zip");
+    vfs.setFileContentInfoFactory(new FileContentInfoFilenameFactory());
+    vfs.setFilesCache(new SoftRefFilesCache());
+    vfs.setReplicator(new UniqueFileReplicator(new 
File(System.getProperty("java.io.tmpdir"))));
+    vfs.setCacheStrategy(CacheStrategy.ON_RESOLVE);
+    vfs.init();
+    return vfs;
+  }
+
+  /**
+   * Create a classloader backed by a virtual filesystem which can handle the 
following URI types:
+   * * res - resource files
+   * * jar
+   * * tar
+   * * bz2
+   * * tgz
+   * * zip
+   * * HDFS
+   * * FTP
+   * * HTTP/S
+   * * file
+   * @param paths A set of comma separated paths.  The paths are URIs or URIs 
with a regex pattern at the end.
+   * @return A classloader object if it can create it
+   * @throws FileSystemException
+   */
+  public static Optional<ClassLoader> configureClassloader(String paths) 
throws FileSystemException {
+    if(paths.trim().isEmpty()) {
+      return Optional.empty();
+    }
+    FileSystemManager vfs = generateVfs();
+    FileObject[] objects = resolve(vfs, paths);
+    if(objects == null || objects.length == 0) {
+      return Optional.empty();
+    }
+    return Optional.of(new VFSClassLoader(objects, vfs, 
vfs.getClass().getClassLoader()));
+  }
+
+  /**
+   * Resolve a set of URIs into FileObject objects.
+   * This is not recursive. The URIs can refer directly to a file or directory 
or an optional regex at the end.
+   * (NOTE: This is NOT a glob).
+   * @param vfs The file system manager to use to resolve URIs
+   * @param uris comma separated URIs and URI + globs
+   * @return
+   * @throws FileSystemException
+   */
+  static FileObject[] resolve(FileSystemManager vfs, String uris) throws 
FileSystemException {
+    if (uris == null) {
+      return new FileObject[0];
+    }
+
+    ArrayList<FileObject> classpath = new ArrayList<>();
+    for (String path : uris.split(",")) {
+      path = path.trim();
+      if (path.equals("")) {
+        continue;
+      }
+      FileObject fo = vfs.resolveFile(path);
+      switch (fo.getType()) {
+        case FILE:
+        case FOLDER:
+          classpath.add(fo);
+          break;
+        case IMAGINARY:
+          // assume its a pattern
+          String pattern = fo.getName().getBaseName();
+          if (fo.getParent() != null && fo.getParent().getType() == 
FileType.FOLDER) {
+            FileObject[] children = fo.getParent().getChildren();
+            for (FileObject child : children) {
+              if (child.getType() == FileType.FILE && 
child.getName().getBaseName().matches(pattern)) {
+                classpath.add(child);
+              }
+            }
+          } else {
+            LOG.warn("ignoring classpath entry " + fo);
+          }
+          break;
+        default:
+          LOG.warn("ignoring classpath entry " + fo);
+          break;
+      }
+    }
+    return classpath.toArray(new FileObject[classpath.size()]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/cli/OptionHandler.java
----------------------------------------------------------------------
diff --git 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/cli/OptionHandler.java
 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/cli/OptionHandler.java
new file mode 100644
index 0000000..074158a
--- /dev/null
+++ 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/cli/OptionHandler.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.stellar.common.utils.cli;
+
+import com.google.common.base.Function;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+
+import java.util.Optional;
+
+public abstract class OptionHandler<OPT_T extends Enum<OPT_T>> implements 
Function<String, Option>
+{
+  public Optional<Object> getValue(OPT_T option, CommandLine cli) {
+    return Optional.empty();
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/BaseStellarFunction.java
----------------------------------------------------------------------
diff --git 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/BaseStellarFunction.java
 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/BaseStellarFunction.java
new file mode 100644
index 0000000..92a4129
--- /dev/null
+++ 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/BaseStellarFunction.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.stellar.dsl;
+
+import java.util.List;
+
+/**
+ * Functions that do not require initialization can extend this class rather 
than directly implement StellarFunction
+ */
+public abstract class BaseStellarFunction implements StellarFunction {
+  public abstract Object apply(List<Object> args);
+
+  @Override
+  public Object apply(List<Object> args, Context context) throws 
ParseException {
+    return apply(args);
+  }
+
+  @Override
+  public void initialize(Context context) {
+
+  }
+
+  @Override
+  public boolean isInitialized() {
+    return true;
+  }
+}

Reply via email to