http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/PausableInput.java ---------------------------------------------------------------------- diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/PausableInput.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/PausableInput.java new file mode 100644 index 0000000..2acd058 --- /dev/null +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/PausableInput.java @@ -0,0 +1,372 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.metron.stellar.common.shell; + +import java.io.IOException; +import java.io.InputStream; + +/** + * An input stream which mirrors System.in, but allows you to 'pause' and 'unpause' it. + * The Aeshell has an external thread which is constantly polling System.in. If you + * need to spawn a program externally (i.e. an editor) which shares stdin, this thread + * and the spawned program both share a buffer. This causes contention and unpredictable + * results (e.g. an input may be consumed by either the aeshell thread or the spawned program) + * + * Because you can inject an input stream into the console, we create this which can act as a + * facade to System.in under normal 'unpaused' circumstances, and when paused, turn off the + * access to System.in. This allows us to turn off access to aeshell while maintaining access + * to the external program. + * + */ +public class PausableInput extends InputStream { + InputStream in = System.in; + boolean paused = false; + private PausableInput() { + super(); + } + + /** + * Stop mirroring stdin + */ + public void pause() { + paused = true; + } + + /** + * Resume mirroring stdin. + * @throws IOException + */ + public void unpause() throws IOException { + in.read(new byte[in.available()]); + paused = false; + } + + public final static PausableInput INSTANCE = new PausableInput(); + + /** + * Reads the next byte of data from the input stream. The value byte is + * returned as an <code>int</code> in the range <code>0</code> to + * <code>255</code>. If no byte is available because the end of the stream + * has been reached, the value <code>-1</code> is returned. This method + * blocks until input data is available, the end of the stream is detected, + * or an exception is thrown. + * <p> + * <p> A subclass must provide an implementation of this method. + * + * @return the next byte of data, or <code>-1</code> if the end of the + * stream is reached. + * @throws IOException if an I/O error occurs. + */ + @Override + public int read() throws IOException { + + return in.read(); + } + + /** + * Reads some number of bytes from the input stream and stores them into + * the buffer array <code>b</code>. The number of bytes actually read is + * returned as an integer. This method blocks until input data is + * available, end of file is detected, or an exception is thrown. + * <p> + * <p> If the length of <code>b</code> is zero, then no bytes are read and + * <code>0</code> is returned; otherwise, there is an attempt to read at + * least one byte. If no byte is available because the stream is at the + * end of the file, the value <code>-1</code> is returned; otherwise, at + * least one byte is read and stored into <code>b</code>. + * <p> + * <p> The first byte read is stored into element <code>b[0]</code>, the + * next one into <code>b[1]</code>, and so on. The number of bytes read is, + * at most, equal to the length of <code>b</code>. Let <i>k</i> be the + * number of bytes actually read; these bytes will be stored in elements + * <code>b[0]</code> through <code>b[</code><i>k</i><code>-1]</code>, + * leaving elements <code>b[</code><i>k</i><code>]</code> through + * <code>b[b.length-1]</code> unaffected. + * <p> + * <p> The <code>read(b)</code> method for class <code>InputStream</code> + * has the same effect as: <pre><code> read(b, 0, b.length) </code></pre> + * + * @param b the buffer into which the data is read. + * @return the total number of bytes read into the buffer, or + * <code>-1</code> if there is no more data because the end of + * the stream has been reached. + * @throws IOException If the first byte cannot be read for any reason + * other than the end of the file, if the input stream has been closed, or + * if some other I/O error occurs. + * @throws NullPointerException if <code>b</code> is <code>null</code>. + * @see InputStream#read(byte[], int, int) + */ + @Override + public int read(byte[] b) throws IOException { + + if(paused) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return 0; + } + int ret = in.read(b); + return ret; + } + + /** + * Reads up to <code>len</code> bytes of data from the input stream into + * an array of bytes. An attempt is made to read as many as + * <code>len</code> bytes, but a smaller number may be read. + * The number of bytes actually read is returned as an integer. + * <p> + * <p> This method blocks until input data is available, end of file is + * detected, or an exception is thrown. + * <p> + * <p> If <code>len</code> is zero, then no bytes are read and + * <code>0</code> is returned; otherwise, there is an attempt to read at + * least one byte. If no byte is available because the stream is at end of + * file, the value <code>-1</code> is returned; otherwise, at least one + * byte is read and stored into <code>b</code>. + * <p> + * <p> The first byte read is stored into element <code>b[off]</code>, the + * next one into <code>b[off+1]</code>, and so on. The number of bytes read + * is, at most, equal to <code>len</code>. Let <i>k</i> be the number of + * bytes actually read; these bytes will be stored in elements + * <code>b[off]</code> through <code>b[off+</code><i>k</i><code>-1]</code>, + * leaving elements <code>b[off+</code><i>k</i><code>]</code> through + * <code>b[off+len-1]</code> unaffected. + * <p> + * <p> In every case, elements <code>b[0]</code> through + * <code>b[off]</code> and elements <code>b[off+len]</code> through + * <code>b[b.length-1]</code> are unaffected. + * <p> + * <p> The <code>read(b,</code> <code>off,</code> <code>len)</code> method + * for class <code>InputStream</code> simply calls the method + * <code>read()</code> repeatedly. If the first such call results in an + * <code>IOException</code>, that exception is returned from the call to + * the <code>read(b,</code> <code>off,</code> <code>len)</code> method. If + * any subsequent call to <code>read()</code> results in a + * <code>IOException</code>, the exception is caught and treated as if it + * were end of file; the bytes read up to that point are stored into + * <code>b</code> and the number of bytes read before the exception + * occurred is returned. The default implementation of this method blocks + * until the requested amount of input data <code>len</code> has been read, + * end of file is detected, or an exception is thrown. Subclasses are encouraged + * to provide a more efficient implementation of this method. + * + * @param b the buffer into which the data is read. + * @param off the start offset in array <code>b</code> + * at which the data is written. + * @param len the maximum number of bytes to read. + * @return the total number of bytes read into the buffer, or + * <code>-1</code> if there is no more data because the end of + * the stream has been reached. + * @throws IOException If the first byte cannot be read for any reason + * other than end of file, or if the input stream has been closed, or if + * some other I/O error occurs. + * @throws NullPointerException If <code>b</code> is <code>null</code>. + * @throws IndexOutOfBoundsException If <code>off</code> is negative, + * <code>len</code> is negative, or <code>len</code> is greater than + * <code>b.length - off</code> + * @see InputStream#read() + */ + @Override + public int read(byte[] b, int off, int len) throws IOException { + if(paused) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return 0; + } + int ret = in.read(b, off, len); + return ret; + } + + /** + * Skips over and discards <code>n</code> bytes of data from this input + * stream. The <code>skip</code> method may, for a variety of reasons, end + * up skipping over some smaller number of bytes, possibly <code>0</code>. + * This may result from any of a number of conditions; reaching end of file + * before <code>n</code> bytes have been skipped is only one possibility. + * The actual number of bytes skipped is returned. If {@code n} is + * negative, the {@code skip} method for class {@code InputStream} always + * returns 0, and no bytes are skipped. Subclasses may handle the negative + * value differently. + * <p> + * <p> The <code>skip</code> method of this class creates a + * byte array and then repeatedly reads into it until <code>n</code> bytes + * have been read or the end of the stream has been reached. Subclasses are + * encouraged to provide a more efficient implementation of this method. + * For instance, the implementation may depend on the ability to seek. + * + * @param n the number of bytes to be skipped. + * @return the actual number of bytes skipped. + * @throws IOException if the stream does not support seek, + * or if some other I/O error occurs. + */ + @Override + public long skip(long n) throws IOException { + + return in.skip(n); + } + + /** + * Returns an estimate of the number of bytes that can be read (or + * skipped over) from this input stream without blocking by the next + * invocation of a method for this input stream. The next invocation + * might be the same thread or another thread. A single read or skip of this + * many bytes will not block, but may read or skip fewer bytes. + * <p> + * <p> Note that while some implementations of {@code InputStream} will return + * the total number of bytes in the stream, many will not. It is + * never correct to use the return value of this method to allocate + * a buffer intended to hold all data in this stream. + * <p> + * <p> A subclass' implementation of this method may choose to throw an + * {@link IOException} if this input stream has been closed by + * invoking the {@link #close()} method. + * <p> + * <p> The {@code available} method for class {@code InputStream} always + * returns {@code 0}. + * <p> + * <p> This method should be overridden by subclasses. + * + * @return an estimate of the number of bytes that can be read (or skipped + * over) from this input stream without blocking or {@code 0} when + * it reaches the end of the input stream. + * @throws IOException if an I/O error occurs. + */ + @Override + public int available() throws IOException { + + return in.available(); + } + + /** + * Closes this input stream and releases any system resources associated + * with the stream. + * <p> + * <p> The <code>close</code> method of <code>InputStream</code> does + * nothing. + * + * @throws IOException if an I/O error occurs. + */ + @Override + public void close() throws IOException { + in.close(); + } + + /** + * Marks the current position in this input stream. A subsequent call to + * the <code>reset</code> method repositions this stream at the last marked + * position so that subsequent reads re-read the same bytes. + * <p> + * <p> The <code>readlimit</code> arguments tells this input stream to + * allow that many bytes to be read before the mark position gets + * invalidated. + * <p> + * <p> The general contract of <code>mark</code> is that, if the method + * <code>markSupported</code> returns <code>true</code>, the stream somehow + * remembers all the bytes read after the call to <code>mark</code> and + * stands ready to supply those same bytes again if and whenever the method + * <code>reset</code> is called. However, the stream is not required to + * remember any data at all if more than <code>readlimit</code> bytes are + * read from the stream before <code>reset</code> is called. + * <p> + * <p> Marking a closed stream should not have any effect on the stream. + * <p> + * <p> The <code>mark</code> method of <code>InputStream</code> does + * nothing. + * + * @param readlimit the maximum limit of bytes that can be read before + * the mark position becomes invalid. + * @see InputStream#reset() + */ + @Override + public synchronized void mark(int readlimit) { + in.mark(readlimit); + } + + /** + * Repositions this stream to the position at the time the + * <code>mark</code> method was last called on this input stream. + * <p> + * <p> The general contract of <code>reset</code> is: + * <p> + * <ul> + * <li> If the method <code>markSupported</code> returns + * <code>true</code>, then: + * <p> + * <ul><li> If the method <code>mark</code> has not been called since + * the stream was created, or the number of bytes read from the stream + * since <code>mark</code> was last called is larger than the argument + * to <code>mark</code> at that last call, then an + * <code>IOException</code> might be thrown. + * <p> + * <li> If such an <code>IOException</code> is not thrown, then the + * stream is reset to a state such that all the bytes read since the + * most recent call to <code>mark</code> (or since the start of the + * file, if <code>mark</code> has not been called) will be resupplied + * to subsequent callers of the <code>read</code> method, followed by + * any bytes that otherwise would have been the next input data as of + * the time of the call to <code>reset</code>. </ul> + * <p> + * <li> If the method <code>markSupported</code> returns + * <code>false</code>, then: + * <p> + * <ul><li> The call to <code>reset</code> may throw an + * <code>IOException</code>. + * <p> + * <li> If an <code>IOException</code> is not thrown, then the stream + * is reset to a fixed state that depends on the particular type of the + * input stream and how it was created. The bytes that will be supplied + * to subsequent callers of the <code>read</code> method depend on the + * particular type of the input stream. </ul></ul> + * <p> + * <p>The method <code>reset</code> for class <code>InputStream</code> + * does nothing except throw an <code>IOException</code>. + * + * @throws IOException if this stream has not been marked or if the + * mark has been invalidated. + * @see InputStream#mark(int) + * @see IOException + */ + @Override + public synchronized void reset() throws IOException { + in.reset(); + } + + /** + * Tests if this input stream supports the <code>mark</code> and + * <code>reset</code> methods. Whether or not <code>mark</code> and + * <code>reset</code> are supported is an invariant property of a + * particular input stream instance. The <code>markSupported</code> method + * of <code>InputStream</code> returns <code>false</code>. + * + * @return <code>true</code> if this stream instance supports the mark + * and reset methods; <code>false</code> otherwise. + * @see InputStream#mark(int) + * @see InputStream#reset() + */ + @Override + public boolean markSupported() { + return in.markSupported(); + } +}
http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/StellarExecutor.java ---------------------------------------------------------------------- diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/StellarExecutor.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/StellarExecutor.java new file mode 100644 index 0000000..1181c6f --- /dev/null +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/StellarExecutor.java @@ -0,0 +1,322 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.stellar.common.shell; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import org.apache.commons.collections4.trie.PatriciaTrie; +import org.apache.commons.lang.StringUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.metron.stellar.common.configuration.ConfigurationsUtils; +import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.stellar.dsl.MapVariableResolver; +import org.apache.metron.stellar.dsl.StellarFunctionInfo; +import org.apache.metron.stellar.dsl.StellarFunctions; +import org.apache.metron.stellar.dsl.VariableResolver; +import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver; +import org.apache.metron.stellar.common.StellarProcessor; +import org.apache.metron.stellar.common.utils.JSONUtils; +import org.jboss.aesh.console.Console; + +import java.io.ByteArrayInputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.SortedMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.metron.stellar.common.configuration.ConfigurationsUtils.readGlobalConfigBytesFromZookeeper; +import static org.apache.metron.stellar.common.shell.StellarExecutor.OperationType.DOC; +import static org.apache.metron.stellar.common.shell.StellarExecutor.OperationType.NORMAL; + +import static org.apache.metron.stellar.dsl.Context.Capabilities.*; + +/** + * Executes Stellar expressions and maintains state across multiple invocations. + */ +public class StellarExecutor { + + public static String SHELL_VARIABLES = "shellVariables"; + public static String CONSOLE = "console"; + + private ReadWriteLock indexLock = new ReentrantReadWriteLock(); + + public static class VariableResult { + private String expression; + private Object result; + + public VariableResult(String expression, Object result) { + this.expression = expression; + this.result = result; + } + + public String getExpression() { + return expression; + } + + public Object getResult() { + return result; + } + + @Override + public String toString() { + String ret = "" + result; + if(expression != null) { + ret += " via " + expression; + } + return ret; + } + } + + /** + * prefix tree index of autocompletes + */ + private PatriciaTrie<AutoCompleteType> autocompleteIndex; + /** + * The variables known by Stellar. + */ + private Map<String, VariableResult> variables; + + /** + * The function resolver. + */ + private FunctionResolver functionResolver; + + /** + * A Zookeeper client. Only defined if given a valid Zookeeper URL. + */ + private Optional<CuratorFramework> client; + + /** + * The Stellar execution context. + */ + private Context context; + + private Console console; + + public enum OperationType { + DOC,MAGIC,NORMAL; + } + + public interface AutoCompleteTransformation { + String transform(OperationType type, String key); + } + + public enum AutoCompleteType implements AutoCompleteTransformation{ + FUNCTION((type, key) -> { + if(type == DOC) { + return StellarShell.DOC_PREFIX + key; + } + else if(type == NORMAL) { + return key + "("; + } + return key; + }) + , VARIABLE((type, key) -> key ) + , TOKEN((type, key) -> key) + ; + AutoCompleteTransformation transform; + AutoCompleteType(AutoCompleteTransformation transform) { + this.transform = transform; + } + + @Override + public String transform(OperationType type, String key) { + return transform.transform(type, key); + } + + } + + /** + * @param console The console used to drive the REPL. + * @param properties The Stellar properties. + * @throws Exception + */ + public StellarExecutor(Console console, Properties properties) throws Exception { + this(null, console, properties); + } + + /** + * @param console The console used to drive the REPL. + * @param properties The Stellar properties. + * @throws Exception + */ + public StellarExecutor(String zookeeperUrl, Console console, Properties properties) throws Exception { + this.variables = new HashMap<>(); + this.client = createClient(zookeeperUrl); + this.context = createContext(properties); + + // initialize the default function resolver + StellarFunctions.initialize(this.context); + this.functionResolver = StellarFunctions.FUNCTION_RESOLVER(); + + this.autocompleteIndex = initializeIndex(); + this.console = console; + + // asynchronously update the index with function names found from a classpath scan. + new Thread( () -> { + Iterable<StellarFunctionInfo> functions = functionResolver.getFunctionInfo(); + indexLock.writeLock().lock(); + try { + for(StellarFunctionInfo info: functions) { + String functionName = info.getName(); + autocompleteIndex.put(functionName, AutoCompleteType.FUNCTION); + } + } + finally { + System.out.println("Functions loaded, you may refer to functions now..."); + indexLock.writeLock().unlock(); + } + }).start(); + } + + private PatriciaTrie<AutoCompleteType> initializeIndex() { + Map<String, AutoCompleteType> index = new HashMap<>(); + + index.put("==", AutoCompleteType.TOKEN); + index.put(">=", AutoCompleteType.TOKEN); + index.put("<=", AutoCompleteType.TOKEN); + index.put(":=", AutoCompleteType.TOKEN); + index.put("quit", AutoCompleteType.TOKEN); + index.put(StellarShell.MAGIC_FUNCTIONS, AutoCompleteType.FUNCTION); + index.put(StellarShell.MAGIC_VARS, AutoCompleteType.FUNCTION); + return new PatriciaTrie<>(index); + } + + public Iterable<String> autoComplete(String buffer, final OperationType opType) { + indexLock.readLock().lock(); + try { + SortedMap<String, AutoCompleteType> ret = autocompleteIndex.prefixMap(buffer); + if (ret.isEmpty()) { + return new ArrayList<>(); + } + return Iterables.transform(ret.entrySet(), kv -> kv.getValue().transform(opType, kv.getKey())); + } + finally { + indexLock.readLock().unlock(); + } + } + + /** + * Creates a Zookeeper client. + * @param zookeeperUrl The Zookeeper URL. + */ + private Optional<CuratorFramework> createClient(String zookeeperUrl) { + + // can only create client, if have valid zookeeper URL + if(StringUtils.isNotBlank(zookeeperUrl)) { + CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl); + client.start(); + return Optional.of(client); + + } else { + return Optional.empty(); + } + } + + /** + * Creates a Context initialized with configuration stored in Zookeeper. + */ + private Context createContext(Properties properties) throws Exception { + + Context.Builder contextBuilder = new Context.Builder() + .with(SHELL_VARIABLES, () -> variables) + .with(CONSOLE, () -> console) + .with(STELLAR_CONFIG, () -> properties); + + // load global configuration from zookeeper + if (client.isPresent()) { + + // fetch the global configuration + Map<String, Object> global = JSONUtils.INSTANCE.load( + new ByteArrayInputStream(readGlobalConfigBytesFromZookeeper(client.get())), + new TypeReference<Map<String, Object>>() {}); + + contextBuilder + .with(GLOBAL_CONFIG, () -> global) + .with(ZOOKEEPER_CLIENT, () -> client.get()) + .with(STELLAR_CONFIG, () -> getStellarConfig(global, properties)); + } + + return contextBuilder.build(); + } + + private Map<String, Object> getStellarConfig(Map<String, Object> globalConfig, Properties props) { + Map<String, Object> ret = new HashMap<>(); + ret.putAll(globalConfig); + if(props != null) { + for (Map.Entry<Object, Object> kv : props.entrySet()) { + ret.put(kv.getKey().toString(), kv.getValue()); + } + } + return ret; + } + + /** + * Executes the Stellar expression and returns the result. + * @param expression The Stellar expression to execute. + * @return The result of the expression. + */ + public Object execute(String expression) { + VariableResolver variableResolver = new MapVariableResolver(Maps.transformValues(variables, result -> result.getResult()) + , Collections.emptyMap()); + StellarProcessor processor = new StellarProcessor(); + return processor.parse(expression, variableResolver, functionResolver, context); + } + + /** + * Assigns a value to a variable. + * @param variable The name of the variable. + * @param value The value of the variable + */ + public void assign(String variable, String expression, Object value) { + this.variables.put(variable, new VariableResult(expression, value)); + indexLock.writeLock().lock(); + try { + if (value != null) { + this.autocompleteIndex.put(variable, AutoCompleteType.VARIABLE); + } else { + this.autocompleteIndex.remove(variable); + } + } + finally { + indexLock.writeLock().unlock(); + } + } + + public Map<String, VariableResult> getVariables() { + return this.variables; + } + + public FunctionResolver getFunctionResolver() { + return functionResolver; + } + + public Context getContext() { + return context; + } +} + http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/StellarShell.java ---------------------------------------------------------------------- diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/StellarShell.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/StellarShell.java new file mode 100644 index 0000000..1b51415 --- /dev/null +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/StellarShell.java @@ -0,0 +1,440 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.stellar.common.shell; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.commons.lang3.StringUtils; +import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.stellar.dsl.StellarFunctionInfo; +import org.apache.metron.stellar.common.StellarAssignment; +import org.apache.metron.stellar.common.utils.JSONUtils; +import org.jboss.aesh.complete.CompleteOperation; +import org.jboss.aesh.complete.Completion; +import org.jboss.aesh.console.AeshConsoleCallback; +import org.jboss.aesh.console.Console; +import org.jboss.aesh.console.ConsoleOperation; +import org.jboss.aesh.console.Prompt; +import org.jboss.aesh.console.settings.SettingsBuilder; +import org.jboss.aesh.terminal.CharacterType; +import org.jboss.aesh.terminal.Color; +import org.jboss.aesh.terminal.TerminalCharacter; +import org.jboss.aesh.terminal.TerminalColor; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * A REPL environment for Stellar. + * + * Useful for debugging Stellar expressions. + */ +public class StellarShell extends AeshConsoleCallback implements Completion { + + private static final String WELCOME = "Stellar, Go!\n" + + "Please note that functions are loading lazily in the background and will be unavailable until loaded fully."; + private List<TerminalCharacter> EXPRESSION_PROMPT = new ArrayList<TerminalCharacter>() + {{ + add(new TerminalCharacter('[', new TerminalColor(Color.RED, Color.DEFAULT))); + add(new TerminalCharacter('S', new TerminalColor(Color.GREEN, Color.DEFAULT), CharacterType.BOLD)); + add(new TerminalCharacter('t', new TerminalColor(Color.GREEN, Color.DEFAULT), CharacterType.BOLD)); + add(new TerminalCharacter('e', new TerminalColor(Color.GREEN, Color.DEFAULT), CharacterType.BOLD)); + add(new TerminalCharacter('l', new TerminalColor(Color.GREEN, Color.DEFAULT), CharacterType.BOLD)); + add(new TerminalCharacter('l', new TerminalColor(Color.GREEN, Color.DEFAULT), CharacterType.BOLD)); + add(new TerminalCharacter('a', new TerminalColor(Color.GREEN, Color.DEFAULT), CharacterType.BOLD)); + add(new TerminalCharacter('r', new TerminalColor(Color.GREEN, Color.DEFAULT), CharacterType.BOLD)); + add(new TerminalCharacter(']', new TerminalColor(Color.RED, Color.DEFAULT))); + add(new TerminalCharacter('>', new TerminalColor(Color.GREEN, Color.DEFAULT), CharacterType.UNDERLINE)); + add(new TerminalCharacter('>', new TerminalColor(Color.GREEN, Color.DEFAULT), CharacterType.UNDERLINE)); + add(new TerminalCharacter('>', new TerminalColor(Color.GREEN, Color.DEFAULT), CharacterType.UNDERLINE)); + add(new TerminalCharacter(' ', new TerminalColor(Color.DEFAULT, Color.DEFAULT))); + }}; + + public static final String ERROR_PROMPT = "[!] "; + public static final String MAGIC_PREFIX = "%"; + public static final String MAGIC_FUNCTIONS = MAGIC_PREFIX + "functions"; + public static final String MAGIC_VARS = MAGIC_PREFIX + "vars"; + public static final String DOC_PREFIX = "?"; + public static final String STELLAR_PROPERTIES_FILENAME = "stellar.properties"; + + private StellarExecutor executor; + + private Console console; + + /** + * Execute the Stellar REPL. + */ + public static void main(String[] args) throws Exception { + StellarShell shell = new StellarShell(args); + shell.execute(); + } + + /** + * Create a Stellar REPL. + * @param args The commmand-line arguments. + */ + public StellarShell(String[] args) throws Exception { + + // define valid command-line options + Options options = new Options(); + options.addOption("z", "zookeeper", true, "Zookeeper URL"); + options.addOption("v", "variables", true, "File containing a JSON Map of variables"); + options.addOption("irc", "inputrc", true, "File containing the inputrc if not the default ~/.inputrc"); + options.addOption("na", "no_ansi", false, "Make the input prompt not use ANSI colors."); + options.addOption("h", "help", false, "Print help"); + options.addOption("p", "properties", true, "File containing Stellar properties"); + + CommandLineParser parser = new PosixParser(); + CommandLine commandLine = parser.parse(options, args); + + // print help + if(commandLine.hasOption("h")) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("stellar", options); + System.exit(0); + } + + console = createConsole(commandLine); + executor = createExecutor(commandLine, console, getStellarProperties(commandLine)); + loadVariables(commandLine, executor); + console.setPrompt(new Prompt(EXPRESSION_PROMPT)); + console.addCompletion(this); + console.setConsoleCallback(this); + } + + /** + * Loads any variables defined in an external file. + * @param commandLine The command line arguments. + * @param executor The stellar executor. + * @throws IOException + */ + private static void loadVariables(CommandLine commandLine, StellarExecutor executor) throws IOException { + if(commandLine.hasOption("v")) { + + Map<String, Object> variables = JSONUtils.INSTANCE.load( + new File(commandLine.getOptionValue("v")), + new TypeReference<Map<String, Object>>() {}); + + for(Map.Entry<String, Object> kv : variables.entrySet()) { + executor.assign(kv.getKey(), null, kv.getValue()); + } + } + } + + /** + * Creates the Stellar execution environment. + * @param commandLine The command line arguments. + * @param console The console which drives the REPL. + * @param properties Stellar properties. + */ + private static StellarExecutor createExecutor(CommandLine commandLine, Console console, Properties properties) throws Exception { + StellarExecutor executor; + + // create the executor + if(commandLine.hasOption("z")) { + String zookeeperUrl = commandLine.getOptionValue("z"); + executor = new StellarExecutor(zookeeperUrl, console, properties); + + } else { + executor = new StellarExecutor(console, properties); + } + + return executor; + } + + /** + * Creates the REPL's console. + * @param commandLine The command line options. + */ + private Console createConsole(CommandLine commandLine) { + + // console settings + boolean useAnsi = !commandLine.hasOption("na"); + SettingsBuilder settings = new SettingsBuilder().enableAlias(true) + .enableMan(true) + .ansi(useAnsi) + .parseOperators(false) + .inputStream(PausableInput.INSTANCE); + + if(commandLine.hasOption("irc")) { + settings = settings.inputrc(new File(commandLine.getOptionValue("irc"))); + } + + return new Console(settings.create()); + } + + /** + * Retrieves the Stellar properties. The properties are either loaded from a file in + * the classpath or a set of defaults are used. + */ + private Properties getStellarProperties(CommandLine commandLine) throws IOException { + Properties properties = new Properties(); + + if (commandLine.hasOption("p")) { + + // first attempt to load properties from a file specified on the command-line + try (InputStream in = new FileInputStream(commandLine.getOptionValue("p"))) { + if(in != null) { + properties.load(in); + } + } + + } else { + + // otherwise attempt to load properties from the classpath + try (InputStream in = getClass().getClassLoader().getResourceAsStream(STELLAR_PROPERTIES_FILENAME)) { + if(in != null) { + properties.load(in); + } + } + } + + return properties; + } + + /** + * Handles the main loop for the REPL. + */ + public void execute() { + + // welcome message and print globals + writeLine(WELCOME); + executor.getContext() + .getCapability(Context.Capabilities.GLOBAL_CONFIG, false) + .ifPresent(conf -> writeLine(conf.toString())); + + console.start(); + } + + /** + * Handles user interaction when executing a Stellar expression. + * @param expression The expression to execute. + */ + private void handleStellar(String expression) { + + String stellarExpression = expression; + String variable = null; + if(StellarAssignment.isAssignment(expression)) { + StellarAssignment expr = StellarAssignment.from(expression); + variable = expr.getVariable(); + stellarExpression = expr.getStatement(); + } + else { + if (!stellarExpression.isEmpty()) { + stellarExpression = stellarExpression.trim(); + } + } + Object result = executeStellar(stellarExpression); + if(result != null && variable == null) { + writeLine(result.toString()); + } + if(variable != null) { + executor.assign(variable, stellarExpression, result); + } + } + + /** + * Handles user interaction when executing a Magic command. + * @param rawExpression The expression to execute. + */ + private void handleMagic( String rawExpression) { + String expression = rawExpression.trim(); + if(MAGIC_FUNCTIONS.equals(expression)) { + + // list all functions + String functions = StreamSupport + .stream(executor.getFunctionResolver().getFunctionInfo().spliterator(), false) + .map(info -> String.format("%s", info.getName())) + .sorted() + .collect(Collectors.joining(", ")); + writeLine(functions); + + } else if(MAGIC_VARS.equals(expression)) { + + // list all variables + + executor.getVariables() + .forEach((k,v) -> writeLine(String.format("%s = %s", k, v))); + + } else { + writeLine(ERROR_PROMPT + "undefined magic command: " + expression); + } + } + + /** + * Handles user interaction when executing a doc command. + * @param expression The expression to execute. + */ + private void handleDoc(String expression) { + + String functionName = StringUtils.substring(expression, 1); + StreamSupport + .stream(executor.getFunctionResolver().getFunctionInfo().spliterator(), false) + .filter(info -> StringUtils.equals(functionName, info.getName())) + .map(info -> format(info)) + .forEach(doc -> write(doc)); + } + + /** + * Formats the Stellar function info object into a readable string. + * @param info The stellar function info object. + * @return A readable string. + */ + private String format(StellarFunctionInfo info) { + StringBuffer ret = new StringBuffer(); + ret.append(info.getName() + "\n"); + ret.append(String.format("Description: %-60s\n\n", info.getDescription())); + if(info.getParams().length > 0) { + ret.append("Arguments:\n"); + for(String param : info.getParams()) { + ret.append(String.format("\t%-60s\n", param)); + } + ret.append("\n"); + } + ret.append(String.format("Returns: %-60s\n", info.getReturns())); + + return ret.toString(); + } + + /** + * Is a given expression a built-in magic? + * @param expression The expression. + */ + private boolean isMagic(String expression) { + return StringUtils.startsWith(expression, MAGIC_PREFIX); + } + + /** + * Is a given expression asking for function documentation? + * @param expression The expression. + */ + private boolean isDoc(String expression) { + return StringUtils.startsWith(expression, DOC_PREFIX); + } + + /** + * Executes a Stellar expression. + * @param expression The expression to execute. + * @return The result of the expression. + */ + private Object executeStellar(String expression) { + Object result = null; + + try { + result = executor.execute(expression); + + } catch(Throwable t) { + writeLine(ERROR_PROMPT + t.getMessage()); + t.printStackTrace(); + } + + return result; + } + + private void write(String out) { + System.out.print(out); + } + + private void writeLine(String out) { + console.getShell().out().println(out); + } + + @Override + public int execute(ConsoleOperation output) throws InterruptedException { + String expression = output.getBuffer().trim(); + if(StringUtils.isNotBlank(expression) ) { + if(isMagic(expression)) { + handleMagic( expression); + + } else if(isDoc(expression)) { + handleDoc(expression); + + } else if (expression.equals("quit")) { + try { + console.stop(); + } catch (Throwable e) { + e.printStackTrace(); + } + } + else if(expression.charAt(0) == '#') { + return 0; + } + else { + handleStellar(expression); + } + } + + return 0; + } + + @Override + public void complete(CompleteOperation completeOperation) { + if(!completeOperation.getBuffer().isEmpty()) { + String lastToken = Iterables.getLast(Splitter.on(" ").split(completeOperation.getBuffer()), null); + if(lastToken != null && !lastToken.isEmpty()) { + lastToken = lastToken.trim(); + final String lastBit = lastToken; + final boolean isDocRequest = isDoc(lastToken); + if(isDocRequest) { + lastToken = lastToken.substring(1); + } + StellarExecutor.OperationType opType = StellarExecutor.OperationType.NORMAL; + if(isDocRequest) { + opType = StellarExecutor.OperationType.DOC; + } + else if(isMagic(lastToken)) { + opType = StellarExecutor.OperationType.MAGIC; + } + Iterable<String> candidates = executor.autoComplete(lastToken, opType); + if(candidates != null && !Iterables.isEmpty(candidates)) { + completeOperation.setCompletionCandidates( Lists.newArrayList( + Iterables.transform(candidates, s -> stripOff(completeOperation.getBuffer(), lastBit) + s ) + ) + ); + } + } + } + + } + + private static String stripOff(String baseString, String lastBit) { + int index = baseString.lastIndexOf(lastBit); + if(index < 0) { + return baseString; + } + return baseString.substring(0, index); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/system/Clock.java ---------------------------------------------------------------------- diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/system/Clock.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/system/Clock.java new file mode 100644 index 0000000..66c9e9d --- /dev/null +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/system/Clock.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.stellar.common.system; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.TimeZone; + +public class Clock { + private final static String UTC = "UTC"; + + public long currentTimeMillis() { + return System.currentTimeMillis(); + } + + public String currentTimeFormatted(String stdDateFormat) { + SimpleDateFormat format = new SimpleDateFormat(stdDateFormat); + format.setTimeZone(TimeZone.getTimeZone(UTC)); + return format.format(new Date(currentTimeMillis())); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/system/Environment.java ---------------------------------------------------------------------- diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/system/Environment.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/system/Environment.java new file mode 100644 index 0000000..2ad4b19 --- /dev/null +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/system/Environment.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.stellar.common.system; + +/** + * Useful so we can test mock dependency injection with environment variables + */ +public class Environment { + public String get(String variable) { + return System.getenv().get(variable); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/BloomFilter.java ---------------------------------------------------------------------- diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/BloomFilter.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/BloomFilter.java new file mode 100644 index 0000000..445dca5 --- /dev/null +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/BloomFilter.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.stellar.common.utils; + +import com.google.common.hash.Funnel; +import com.google.common.hash.PrimitiveSink; + +import java.io.Serializable; +import java.util.function.Function; + +public class BloomFilter<T> implements Serializable { + + private static class BloomFunnel<T> implements Funnel<T>, Serializable { + Function<T, byte[]> serializer; + public BloomFunnel(Function<T, byte[]> serializer) { + this.serializer = serializer; + } + @Override + public void funnel(T obj, PrimitiveSink primitiveSink) { + primitiveSink.putBytes(serializer.apply(obj)); + } + + @Override + public boolean equals(Object obj) { + return this.getClass().equals(obj.getClass()); + } + + @Override + public int hashCode() { + return super.hashCode() * 31; + } + } + + public static class DefaultSerializer<T> implements Function<T, byte[]> { + @Override + public byte[] apply(T t) { + return SerDeUtils.toBytes(t); + } + } + private com.google.common.hash.BloomFilter<T> filter; + + public BloomFilter(Function<T, byte[]> serializer, int expectedInsertions, double falsePositiveRate) { + filter = com.google.common.hash.BloomFilter.create(new BloomFunnel<T>(serializer), expectedInsertions, falsePositiveRate); + } + + public boolean mightContain(T key) { + return filter.mightContain(key); + } + public void add(T key) { + filter.put(key); + } + public void merge(BloomFilter<T> filter2) { + filter.putAll(filter2.filter); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + BloomFilter<?> that = (BloomFilter<?>) o; + + return filter != null ? filter.equals(that.filter) : that.filter == null; + + } + + @Override + public int hashCode() { + return filter != null ? filter.hashCode() : 0; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/ConversionUtils.java ---------------------------------------------------------------------- diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/ConversionUtils.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/ConversionUtils.java new file mode 100644 index 0000000..b53097f --- /dev/null +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/ConversionUtils.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.stellar.common.utils; + +import com.google.common.collect.Lists; +import org.apache.commons.beanutils.BeanUtilsBean2; +import org.apache.commons.beanutils.ConvertUtilsBean; + +import java.util.List; + +public class ConversionUtils { + private static ThreadLocal<ConvertUtilsBean> UTILS_BEAN = new ThreadLocal<ConvertUtilsBean>() { + @Override + protected ConvertUtilsBean initialValue() { + ConvertUtilsBean ret = BeanUtilsBean2.getInstance().getConvertUtils(); + ret.deregister(); + ret.register(false, true, 1); + return ret; + } + }; + + public static <T> T convert(Object o, Class<T> clazz) { + if (o == null) { + return null; + } + return clazz.cast(UTILS_BEAN.get().convert(o, clazz)); + } + + /** + * Performs naive List type conversion. + * + * @param from Source list + * @param clazz Class type to cast the List elements to + * @param <T> Source element type + * @param <U> Desired element type + * @return New List with the elements cast to the desired type + */ + public static <T, U> List<U> convertList(List<T> from, Class<U> clazz) { + return Lists.transform(from, s -> convert(s, clazz)); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/JSONUtils.java ---------------------------------------------------------------------- diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/JSONUtils.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/JSONUtils.java new file mode 100644 index 0000000..d99263b --- /dev/null +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/JSONUtils.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.stellar.common.utils; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; + +import java.io.*; + +public enum JSONUtils { + INSTANCE; + + private static ThreadLocal<JSONParser> _parser = ThreadLocal.withInitial(() -> + new JSONParser()); + + private static ThreadLocal<ObjectMapper> _mapper = ThreadLocal.withInitial(() -> + new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL)); + + public <T> T load(InputStream is, TypeReference<T> ref) throws IOException { + return _mapper.get().readValue(is, ref); + } + + public <T> T load(String is, TypeReference<T> ref) throws IOException { + return _mapper.get().readValue(is, ref); + } + + public <T> T load(File f, TypeReference<T> ref) throws IOException { + try (InputStream is = new BufferedInputStream(new FileInputStream(f))) { + return _mapper.get().readValue(is, ref); + } + } + + public <T> T load(InputStream is, Class<T> clazz) throws IOException { + return _mapper.get().readValue(is, clazz); + } + + public <T> T load(File f, Class<T> clazz) throws IOException { + try (InputStream is = new BufferedInputStream(new FileInputStream(f))) { + return _mapper.get().readValue(is, clazz); + } + } + + public <T> T load(String is, Class<T> clazz) throws IOException { + return _mapper.get().readValue(is, clazz); + } + + public String toJSON(Object o, boolean pretty) throws JsonProcessingException { + if (pretty) { + return _mapper.get().writerWithDefaultPrettyPrinter().writeValueAsString(o); + } else { + return _mapper.get().writeValueAsString(o); + } + } + + public byte[] toJSON(Object config) throws JsonProcessingException { + return _mapper.get().writeValueAsBytes(config); + } + + /** + * Transforms a bean (aka POJO) to a JSONObject. + */ + public JSONObject toJSONObject(Object o) throws JsonProcessingException, ParseException { + return (JSONObject) _parser.get().parse(toJSON(o, false)); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/SerDeUtils.java ---------------------------------------------------------------------- diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/SerDeUtils.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/SerDeUtils.java new file mode 100644 index 0000000..7bd9520 --- /dev/null +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/SerDeUtils.java @@ -0,0 +1,256 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.stellar.common.utils; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoException; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.esotericsoftware.kryo.util.Util; +import com.esotericsoftware.reflectasm.ConstructorAccess; +import de.javakaffee.kryoserializers.*; +import de.javakaffee.kryoserializers.cglib.CGLibProxySerializer; +import de.javakaffee.kryoserializers.guava.ImmutableListSerializer; +import de.javakaffee.kryoserializers.guava.ImmutableMapSerializer; +import de.javakaffee.kryoserializers.guava.ImmutableMultimapSerializer; +import de.javakaffee.kryoserializers.guava.ImmutableSetSerializer; +import de.javakaffee.kryoserializers.jodatime.JodaLocalDateSerializer; +import de.javakaffee.kryoserializers.jodatime.JodaLocalDateTimeSerializer; +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.objenesis.instantiator.ObjectInstantiator; +import org.objenesis.strategy.InstantiatorStrategy; +import org.objenesis.strategy.StdInstantiatorStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Modifier; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.*; +import java.util.function.Function; + +import static com.esotericsoftware.kryo.util.Util.className; + +/** + * Provides basic functionality to serialize and deserialize the allowed + * value types for a ProfileMeasurement. + */ +public class SerDeUtils { + protected static final Logger LOG = LoggerFactory.getLogger(SerDeUtils.class); + private static ThreadLocal<Kryo> kryo = new ThreadLocal<Kryo>() { + @Override + protected Kryo initialValue() { + Kryo ret = new Kryo(); + ret.setReferences(true); + ret.setInstantiatorStrategy(new DefaultInstantiatorStrategy(new StdInstantiatorStrategy())); + + ret.register(Arrays.asList("").getClass(), new ArraysAsListSerializer()); + ret.register(Collections.EMPTY_LIST.getClass(), new CollectionsEmptyListSerializer()); + ret.register(Collections.EMPTY_MAP.getClass(), new CollectionsEmptyMapSerializer()); + ret.register(Collections.EMPTY_SET.getClass(), new CollectionsEmptySetSerializer()); + ret.register(Collections.singletonList("").getClass(), new CollectionsSingletonListSerializer()); + ret.register(Collections.singleton("").getClass(), new CollectionsSingletonSetSerializer()); + ret.register(Collections.singletonMap("", "").getClass(), new CollectionsSingletonMapSerializer()); + ret.register(GregorianCalendar.class, new GregorianCalendarSerializer()); + ret.register(InvocationHandler.class, new JdkProxySerializer()); + UnmodifiableCollectionsSerializer.registerSerializers(ret); + SynchronizedCollectionsSerializer.registerSerializers(ret); + +// custom serializers for non-jdk libs + +// register CGLibProxySerializer, works in combination with the appropriate action in handleUnregisteredClass (see below) + ret.register(CGLibProxySerializer.CGLibProxyMarker.class, new CGLibProxySerializer()); +// joda DateTime, LocalDate and LocalDateTime + ret.register(LocalDate.class, new JodaLocalDateSerializer()); + ret.register(LocalDateTime.class, new JodaLocalDateTimeSerializer()); +// guava ImmutableList, ImmutableSet, ImmutableMap, ImmutableMultimap, UnmodifiableNavigableSet + ImmutableListSerializer.registerSerializers(ret); + ImmutableSetSerializer.registerSerializers(ret); + ImmutableMapSerializer.registerSerializers(ret); + ImmutableMultimapSerializer.registerSerializers(ret); + return ret; + } + }; + + /** + * This was backported from a more recent version of kryo than we currently run. The reason why it exists is + * that we want a strategy for instantiation of classes which attempts a no-arg constructor first and THEN falls + * back to reflection for performance reasons alone (this is, after all, in the critical path). + * + */ + static private class DefaultInstantiatorStrategy implements org.objenesis.strategy.InstantiatorStrategy { + private InstantiatorStrategy fallbackStrategy; + + public DefaultInstantiatorStrategy () { + } + + public DefaultInstantiatorStrategy (InstantiatorStrategy fallbackStrategy) { + this.fallbackStrategy = fallbackStrategy; + } + + public void setFallbackInstantiatorStrategy (final InstantiatorStrategy fallbackStrategy) { + this.fallbackStrategy = fallbackStrategy; + } + + public InstantiatorStrategy getFallbackInstantiatorStrategy () { + return fallbackStrategy; + } + + @Override + public ObjectInstantiator newInstantiatorOf (final Class type) { + if (!Util.isAndroid) { + // Use ReflectASM if the class is not a non-static member class. + Class enclosingType = type.getEnclosingClass(); + boolean isNonStaticMemberClass = enclosingType != null && type.isMemberClass() + && !Modifier.isStatic(type.getModifiers()); + if (!isNonStaticMemberClass) { + try { + final ConstructorAccess access = ConstructorAccess.get(type); + return new ObjectInstantiator() { + @Override + public Object newInstance () { + try { + return access.newInstance(); + } catch (Exception ex) { + throw new KryoException("Error constructing instance of class: " + className(type), ex); + } + } + }; + } catch (Exception ignored) { + } + } + } + // Reflection. + try { + Constructor ctor; + try { + ctor = type.getConstructor((Class[])null); + } catch (Exception ex) { + ctor = type.getDeclaredConstructor((Class[])null); + ctor.setAccessible(true); + } + final Constructor constructor = ctor; + return new ObjectInstantiator() { + @Override + public Object newInstance () { + try { + return constructor.newInstance(); + } catch (Exception ex) { + throw new KryoException("Error constructing instance of class: " + className(type), ex); + } + } + }; + } catch (Exception ignored) { + } + if (fallbackStrategy == null) { + if (type.isMemberClass() && !Modifier.isStatic(type.getModifiers())) + throw new KryoException("Class cannot be created (non-static member class): " + className(type)); + else + throw new KryoException("Class cannot be created (missing no-arg constructor): " + className(type)); + } + // InstantiatorStrategy. + return fallbackStrategy.newInstantiatorOf(type); + } + } + + public static Serializer SERIALIZER = new Serializer(); + + private static class Serializer implements Function<Object, byte[]> { + /** + * Serializes the given Object into bytes. + * + */ + @Override + public byte[] apply(Object o) { + return toBytes(o); + } + } + + public static class Deserializer<T> implements Function<byte[], T> { + + private Class<T> clazz; + public Deserializer(Class<T> clazz) { + this.clazz = clazz; + } + /** + * Deserializes the given bytes. + * + * @param bytes the function argument + * @return the function result + */ + @Override + public T apply(byte[] bytes) { + return fromBytes(bytes, clazz); + } + } + + + private SerDeUtils() { + // do not instantiate + } + + /** + * Serialize a profile measurement's value. + * + * The value produced by a Profile definition can be any numeric data type. The data + * type depends on how the profile is defined by the user. The user should be able to + * choose the data type that is most suitable for their use case. + * + * @param value The value to serialize. + */ + public static byte[] toBytes(Object value) { + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + Output output = new Output(bos); + kryo.get().writeClassAndObject(output, value); + output.flush(); + bos.flush(); + return bos.toByteArray(); + } + catch(Throwable t) { + LOG.error("Unable to serialize: " + value + " because " + t.getMessage(), t); + throw new IllegalStateException("Unable to serialize " + value + " because " + t.getMessage(), t); + } + } + + /** + * Deserialize a profile measurement's value. + * + * The value produced by a Profile definition can be any numeric data type. The data + * type depends on how the profile is defined by the user. The user should be able to + * choose the data type that is most suitable for their use case. + * + * @param value The value to deserialize. + */ + public static <T> T fromBytes(byte[] value, Class<T> clazz) { + try { + Input input = new Input(new ByteArrayInputStream(value)); + return clazz.cast(kryo.get().readClassAndObject(input)); + } + catch(Throwable t) { + LOG.error("Unable to deserialize because " + t.getMessage(), t); + throw t; + } + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java ---------------------------------------------------------------------- diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java new file mode 100644 index 0000000..cc19b5f --- /dev/null +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.stellar.common.utils; + +import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.stellar.dsl.MapVariableResolver; +import org.apache.metron.stellar.dsl.StellarFunctions; +import org.apache.metron.stellar.dsl.VariableResolver; +import com.google.common.collect.ImmutableList; +import org.apache.metron.stellar.common.StellarPredicateProcessor; +import org.apache.metron.stellar.common.StellarProcessor; +import org.junit.Assert; + +import java.util.AbstractMap; +import java.util.List; +import java.util.Map; +import java.util.Spliterators; +import java.util.function.Consumer; +import java.util.function.IntConsumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +public class StellarProcessorUtils { + + /** + * This utility class is intended for use while unit testing Stellar operators. + * It is included in the "main" code so third-party operators will not need + * a test dependency on Stellar's test-jar. + * + * This class ensures the basic contract of a stellar expression is adhered to: + * 1. Validate works on the expression + * 2. The output can be serialized and deserialized properly + * + * @param rule + * @param variables + * @param context + * @return + */ + public static Object run(String rule, Map<String, Object> variables, Context context) { + StellarProcessor processor = new StellarProcessor(); + Assert.assertTrue(rule + " not valid.", processor.validate(rule, context)); + Object ret = processor.parse(rule, x -> variables.get(x), StellarFunctions.FUNCTION_RESOLVER(), context); + byte[] raw = SerDeUtils.toBytes(ret); + Object actual = SerDeUtils.fromBytes(raw, Object.class); + Assert.assertEquals(ret, actual); + return ret; + } + + public static Object run(String rule, Map<String, Object> variables) { + return run(rule, variables, Context.EMPTY_CONTEXT()); + } + + public static boolean runPredicate(String rule, Map resolver) { + return runPredicate(rule, resolver, Context.EMPTY_CONTEXT()); + } + + public static boolean runPredicate(String rule, Map resolver, Context context) { + return runPredicate(rule, new MapVariableResolver(resolver), context); + } + + public static boolean runPredicate(String rule, VariableResolver resolver) { + return runPredicate(rule, resolver, Context.EMPTY_CONTEXT()); + } + + public static boolean runPredicate(String rule, VariableResolver resolver, Context context) { + StellarPredicateProcessor processor = new StellarPredicateProcessor(); + Assert.assertTrue(rule + " not valid.", processor.validate(rule)); + return processor.parse(rule, resolver, StellarFunctions.FUNCTION_RESOLVER(), context); + } + + public static void runWithArguments(String function, Object argument, Object expected) { + runWithArguments(function, ImmutableList.of(argument), expected); + } + + public static void runWithArguments(String function, List<Object> arguments, Object expected) { + Supplier<Stream<Map.Entry<String, Object>>> kvStream = () -> StreamSupport.stream(new XRange(arguments.size()), false) + .map( i -> new AbstractMap.SimpleImmutableEntry<>("var" + i, arguments.get(i))); + + String args = kvStream.get().map( kv -> kv.getKey()) + .collect(Collectors.joining(",")); + Map<String, Object> variables = kvStream.get().collect(Collectors.toMap(kv -> kv.getKey(), kv -> kv.getValue())); + String stellarStatement = function + "(" + args + ")"; + String reason = stellarStatement + " != " + expected + " with variables: " + variables; + + if(expected instanceof Double) { + Assert.assertEquals(reason, (Double)expected, (Double)run(stellarStatement, variables), 1e-6); + } + else { + Assert.assertEquals(reason, expected, run(stellarStatement, variables)); + } + } + + public static class XRange extends Spliterators.AbstractIntSpliterator { + int end; + int i = 0; + + public XRange(int start, int end) { + super(end - start, 0); + i = start; + this.end = end; + } + + public XRange(int end) { + this(0, end); + } + + @Override + public boolean tryAdvance(IntConsumer action) { + boolean isDone = i >= end; + if(isDone) { + return false; + } + else { + action.accept(i); + i++; + return true; + } + } + + /** + * {@inheritDoc} + * + * @param action + * @implSpec If the action is an instance of {@code IntConsumer} then it is cast + * to {@code IntConsumer} and passed to + * {@link #tryAdvance(IntConsumer)}; otherwise + * the action is adapted to an instance of {@code IntConsumer}, by + * boxing the argument of {@code IntConsumer}, and then passed to + * {@link #tryAdvance(IntConsumer)}. + */ + @Override + public boolean tryAdvance(Consumer<? super Integer> action) { + boolean isDone = i >= end; + if(isDone) { + return false; + } + else { + action.accept(i); + i++; + return true; + } + } + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/VFSClassloaderUtil.java ---------------------------------------------------------------------- diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/VFSClassloaderUtil.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/VFSClassloaderUtil.java new file mode 100644 index 0000000..b70df11 --- /dev/null +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/VFSClassloaderUtil.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.stellar.common.utils; + +import org.apache.accumulo.start.classloader.vfs.UniqueFileReplicator; +import org.apache.commons.vfs2.*; +import org.apache.commons.vfs2.cache.SoftRefFilesCache; +import org.apache.commons.vfs2.impl.DefaultFileSystemManager; +import org.apache.commons.vfs2.impl.FileContentInfoFilenameFactory; +import org.apache.commons.vfs2.impl.VFSClassLoader; +import org.apache.commons.vfs2.provider.hdfs.HdfsFileProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.*; + +public class VFSClassloaderUtil { + private static final Logger LOG = LoggerFactory.getLogger(VFSClassloaderUtil.class); + + /** + * Create a FileSystem manager suitable for our purposes. + * This manager supports files of the following types: + * * res - resource files + * * jar + * * tar + * * bz2 + * * tgz + * * zip + * * HDFS + * * FTP + * * HTTP/S + * * file + * @return + * @throws FileSystemException + */ + public static FileSystemManager generateVfs() throws FileSystemException { + DefaultFileSystemManager vfs = new DefaultFileSystemManager(); + vfs.addProvider("res", new org.apache.commons.vfs2.provider.res.ResourceFileProvider()); + vfs.addProvider("zip", new org.apache.commons.vfs2.provider.zip.ZipFileProvider()); + vfs.addProvider("gz", new org.apache.commons.vfs2.provider.gzip.GzipFileProvider()); + vfs.addProvider("ram", new org.apache.commons.vfs2.provider.ram.RamFileProvider()); + vfs.addProvider("file", new org.apache.commons.vfs2.provider.local.DefaultLocalFileProvider()); + vfs.addProvider("jar", new org.apache.commons.vfs2.provider.jar.JarFileProvider()); + vfs.addProvider("http", new org.apache.commons.vfs2.provider.http.HttpFileProvider()); + vfs.addProvider("https", new org.apache.commons.vfs2.provider.https.HttpsFileProvider()); + vfs.addProvider("ftp", new org.apache.commons.vfs2.provider.ftp.FtpFileProvider()); + vfs.addProvider("ftps", new org.apache.commons.vfs2.provider.ftps.FtpsFileProvider()); + vfs.addProvider("war", new org.apache.commons.vfs2.provider.jar.JarFileProvider()); + vfs.addProvider("par", new org.apache.commons.vfs2.provider.jar.JarFileProvider()); + vfs.addProvider("ear", new org.apache.commons.vfs2.provider.jar.JarFileProvider()); + vfs.addProvider("sar", new org.apache.commons.vfs2.provider.jar.JarFileProvider()); + vfs.addProvider("ejb3", new org.apache.commons.vfs2.provider.jar.JarFileProvider()); + vfs.addProvider("tmp", new org.apache.commons.vfs2.provider.temp.TemporaryFileProvider()); + vfs.addProvider("tar", new org.apache.commons.vfs2.provider.tar.TarFileProvider()); + vfs.addProvider("tbz2", new org.apache.commons.vfs2.provider.tar.TarFileProvider()); + vfs.addProvider("tgz", new org.apache.commons.vfs2.provider.tar.TarFileProvider()); + vfs.addProvider("bz2", new org.apache.commons.vfs2.provider.bzip2.Bzip2FileProvider()); + vfs.addProvider("hdfs", new HdfsFileProvider()); + vfs.addExtensionMap("jar", "jar"); + vfs.addExtensionMap("zip", "zip"); + vfs.addExtensionMap("gz", "gz"); + vfs.addExtensionMap("tar", "tar"); + vfs.addExtensionMap("tbz2", "tar"); + vfs.addExtensionMap("tgz", "tar"); + vfs.addExtensionMap("bz2", "bz2"); + vfs.addMimeTypeMap("application/x-tar", "tar"); + vfs.addMimeTypeMap("application/x-gzip", "gz"); + vfs.addMimeTypeMap("application/zip", "zip"); + vfs.setFileContentInfoFactory(new FileContentInfoFilenameFactory()); + vfs.setFilesCache(new SoftRefFilesCache()); + vfs.setReplicator(new UniqueFileReplicator(new File(System.getProperty("java.io.tmpdir")))); + vfs.setCacheStrategy(CacheStrategy.ON_RESOLVE); + vfs.init(); + return vfs; + } + + /** + * Create a classloader backed by a virtual filesystem which can handle the following URI types: + * * res - resource files + * * jar + * * tar + * * bz2 + * * tgz + * * zip + * * HDFS + * * FTP + * * HTTP/S + * * file + * @param paths A set of comma separated paths. The paths are URIs or URIs with a regex pattern at the end. + * @return A classloader object if it can create it + * @throws FileSystemException + */ + public static Optional<ClassLoader> configureClassloader(String paths) throws FileSystemException { + if(paths.trim().isEmpty()) { + return Optional.empty(); + } + FileSystemManager vfs = generateVfs(); + FileObject[] objects = resolve(vfs, paths); + if(objects == null || objects.length == 0) { + return Optional.empty(); + } + return Optional.of(new VFSClassLoader(objects, vfs, vfs.getClass().getClassLoader())); + } + + /** + * Resolve a set of URIs into FileObject objects. + * This is not recursive. The URIs can refer directly to a file or directory or an optional regex at the end. + * (NOTE: This is NOT a glob). + * @param vfs The file system manager to use to resolve URIs + * @param uris comma separated URIs and URI + globs + * @return + * @throws FileSystemException + */ + static FileObject[] resolve(FileSystemManager vfs, String uris) throws FileSystemException { + if (uris == null) { + return new FileObject[0]; + } + + ArrayList<FileObject> classpath = new ArrayList<>(); + for (String path : uris.split(",")) { + path = path.trim(); + if (path.equals("")) { + continue; + } + FileObject fo = vfs.resolveFile(path); + switch (fo.getType()) { + case FILE: + case FOLDER: + classpath.add(fo); + break; + case IMAGINARY: + // assume its a pattern + String pattern = fo.getName().getBaseName(); + if (fo.getParent() != null && fo.getParent().getType() == FileType.FOLDER) { + FileObject[] children = fo.getParent().getChildren(); + for (FileObject child : children) { + if (child.getType() == FileType.FILE && child.getName().getBaseName().matches(pattern)) { + classpath.add(child); + } + } + } else { + LOG.warn("ignoring classpath entry " + fo); + } + break; + default: + LOG.warn("ignoring classpath entry " + fo); + break; + } + } + return classpath.toArray(new FileObject[classpath.size()]); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/cli/OptionHandler.java ---------------------------------------------------------------------- diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/cli/OptionHandler.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/cli/OptionHandler.java new file mode 100644 index 0000000..074158a --- /dev/null +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/cli/OptionHandler.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.stellar.common.utils.cli; + +import com.google.common.base.Function; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; + +import java.util.Optional; + +public abstract class OptionHandler<OPT_T extends Enum<OPT_T>> implements Function<String, Option> +{ + public Optional<Object> getValue(OPT_T option, CommandLine cli) { + return Optional.empty(); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/BaseStellarFunction.java ---------------------------------------------------------------------- diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/BaseStellarFunction.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/BaseStellarFunction.java new file mode 100644 index 0000000..92a4129 --- /dev/null +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/BaseStellarFunction.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.stellar.dsl; + +import java.util.List; + +/** + * Functions that do not require initialization can extend this class rather than directly implement StellarFunction + */ +public abstract class BaseStellarFunction implements StellarFunction { + public abstract Object apply(List<Object> args); + + @Override + public Object apply(List<Object> args, Context context) throws ParseException { + return apply(args); + } + + @Override + public void initialize(Context context) { + + } + + @Override + public boolean isInitialized() { + return true; + } +}
