http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java new file mode 100644 index 0000000..41c31d7 --- /dev/null +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java @@ -0,0 +1,332 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler; + +import static java.lang.String.format; + +import com.google.common.collect.ImmutableMap; +import java.io.Serializable; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.commons.collections4.ListUtils; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.metron.common.configuration.profiler.ProfileConfig; +import org.apache.metron.profiler.clock.Clock; +import org.apache.metron.profiler.clock.WallClock; +import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor; +import org.apache.metron.stellar.common.StellarStatefulExecutor; +import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.stellar.dsl.ParseException; +import org.apache.metron.stellar.dsl.StellarFunctions; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Responsible for building and maintaining a Profile. + * + * One or more messages are applied to the Profile with `apply` and a profile measurement is + * produced by calling `flush`. + * + * Any one instance is responsible only for building the profile for a specific [profile, entity] + * pairing. There will exist many instances, one for each [profile, entity] pair that exists + * within the incoming telemetry data applied to the profile. + */ +public class DefaultProfileBuilder implements ProfileBuilder, Serializable { + + protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /** + * The name of the profile. + */ + private String profileName; + + /** + * The name of the entity. + */ + private String entity; + + /** + * The definition of the Profile that the bolt is building. + */ + private ProfileConfig definition; + + /** + * Executes Stellar code and maintains state across multiple invocations. + */ + private StellarStatefulExecutor executor; + + /** + * Has the profile been initialized? + */ + private boolean isInitialized; + + /** + * The duration of each period in milliseconds. + */ + private long periodDurationMillis; + + /** + * A clock is used to tell time; imagine that. + */ + private Clock clock; + + /** + * Use the ProfileBuilder.Builder to create a new ProfileBuilder. + */ + private DefaultProfileBuilder(ProfileConfig definition, + String entity, + Clock clock, + long periodDurationMillis, + Context stellarContext) { + + this.isInitialized = false; + this.definition = definition; + this.profileName = definition.getProfile(); + this.entity = entity; + this.clock = clock; + this.periodDurationMillis = periodDurationMillis; + this.executor = new DefaultStellarStatefulExecutor(); + StellarFunctions.initialize(stellarContext); + this.executor.setContext(stellarContext); + } + + /** + * Apply a message to the profile. + * @param message The message to apply. + */ + @Override + @SuppressWarnings("unchecked") + public void apply(JSONObject message) { + + if(!isInitialized()) { + assign(definition.getInit(), message, "init"); + isInitialized = true; + } + + assign(definition.getUpdate(), message, "update"); + } + + /** + * Flush the Profile. + * + * Completes and emits the ProfileMeasurement. Clears all state in preparation for + * the next window period. + * + * @return Returns the completed profile measurement. + */ + @Override + public ProfileMeasurement flush() { + LOG.debug("Flushing profile: profile={}, entity={}", profileName, entity); + + // execute the 'profile' expression(s) + @SuppressWarnings("unchecked") + Object profileValue = execute(definition.getResult().getProfileExpressions().getExpression(), "result/profile"); + + // execute the 'triage' expression(s) + Map<String, Object> triageValues = definition.getResult().getTriageExpressions().getExpressions() + .entrySet() + .stream() + .collect(Collectors.toMap( + e -> e.getKey(), + e -> execute(e.getValue(), "result/triage"))); + + // execute the 'groupBy' expression(s) - can refer to value of 'result' expression + List<Object> groups = execute(definition.getGroupBy(), ImmutableMap.of("result", profileValue), "groupBy"); + + isInitialized = false; + return new ProfileMeasurement() + .withProfileName(profileName) + .withEntity(entity) + .withGroups(groups) + .withPeriod(clock.currentTimeMillis(), periodDurationMillis, TimeUnit.MILLISECONDS) + .withProfileValue(profileValue) + .withTriageValues(triageValues) + .withDefinition(definition); + } + + /** + * Returns the current value of a variable. + * @param variable The name of the variable. + */ + @Override + public Object valueOf(String variable) { + return executor.getState().get(variable); + } + + @Override + public boolean isInitialized() { + return isInitialized; + } + + @Override + public ProfileConfig getDefinition() { + return definition; + } + + /** + * Executes an expression contained within the profile definition. + * @param expression The expression to execute. + * @param transientState Additional transient state provided to the expression. + * @param expressionType The type of expression; init, update, result. Provides additional context if expression execution fails. + * @return The result of executing the expression. + */ + private Object execute(String expression, Map<String, Object> transientState, String expressionType) { + Object result = null; + + List<Object> allResults = execute(Collections.singletonList(expression), transientState, expressionType); + if(allResults.size() > 0) { + result = allResults.get(0); + } + + return result; + } + + /** + * Executes an expression contained within the profile definition. + * @param expression The expression to execute. + * @param expressionType The type of expression; init, update, result. Provides additional context if expression execution fails. + * @return The result of executing the expression. + */ + private Object execute(String expression, String expressionType) { + return execute(expression, Collections.emptyMap(), expressionType); + } + + + /** + * Executes a set of expressions whose results need to be assigned to a variable. + * @param expressions Maps the name of a variable to the expression whose result should be assigned to it. + * @param transientState Additional transient state provided to the expression. + * @param expressionType The type of expression; init, update, result. Provides additional context if expression execution fails. + */ + private void assign(Map<String, String> expressions, Map<String, Object> transientState, String expressionType) { + try { + + // execute each of the 'update' expressions + MapUtils.emptyIfNull(expressions) + .forEach((var, expr) -> executor.assign(var, expr, transientState)); + + } catch(ParseException e) { + + // make it brilliantly clear that one of the 'update' expressions is bad + String msg = format("Bad '%s' expression: %s, profile=%s, entity=%s", expressionType, e.getMessage(), profileName, entity); + throw new ParseException(msg, e); + } + } + + /** + * Executes the expressions contained within the profile definition. + * @param expressions A list of expressions to execute. + * @param transientState Additional transient state provided to the expressions. + * @param expressionType The type of expression; init, update, result. Provides additional context if expression execution fails. + * @return The result of executing each expression. + */ + private List<Object> execute(List<String> expressions, Map<String, Object> transientState, String expressionType) { + List<Object> results = new ArrayList<>(); + + try { + ListUtils.emptyIfNull(expressions) + .forEach((expr) -> results.add(executor.execute(expr, transientState, Object.class))); + + } catch (Throwable e) { + String msg = format("Bad '%s' expression: %s, profile=%s, entity=%s", expressionType, e.getMessage(), profileName, entity); + throw new ParseException(msg, e); + } + + return results; + } + + /** + * A builder used to construct a new ProfileBuilder. + */ + public static class Builder { + + private ProfileConfig definition; + private String entity; + private long periodDurationMillis; + private Clock clock = new WallClock(); + private Context context; + + public Builder withContext(Context context) { + this.context = context; + return this; + } + + public Builder withClock(Clock clock) { + this.clock = clock; + return this; + } + + /** + * @param definition The profiler definition. + */ + public Builder withDefinition(ProfileConfig definition) { + this.definition = definition; + return this; + } + + /** + * @param entity The name of the entity + */ + public Builder withEntity(String entity) { + this.entity = entity; + return this; + } + + /** + * @param duration The duration of each profile period. + * @param units The units used to specify the duration of the profile period. + */ + public Builder withPeriodDuration(long duration, TimeUnit units) { + this.periodDurationMillis = units.toMillis(duration); + return this; + } + + /** + * @param millis The duration of each profile period in milliseconds. + */ + public Builder withPeriodDurationMillis(long millis) { + this.periodDurationMillis = millis; + return this; + } + + /** + * Construct a ProfileBuilder. + */ + public ProfileBuilder build() { + + if(definition == null) { + throw new IllegalArgumentException("missing profiler definition; got null"); + } + if(StringUtils.isEmpty(entity)) { + throw new IllegalArgumentException(format("missing entity name; got '%s'", entity)); + } + + return new DefaultProfileBuilder(definition, entity, clock, periodDurationMillis, context); + } + } +}
http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java new file mode 100644 index 0000000..a60446f --- /dev/null +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler; + +import org.apache.metron.stellar.dsl.Context; +import org.json.simple.JSONObject; + +import java.util.List; +import java.util.concurrent.ExecutionException; + +/** + * Distributes a message along a MessageRoute. A MessageRoute will lead to one or + * more ProfileBuilders. + * + * A ProfileBuilder is responsible for maintaining the state of a single profile, + * for a single entity. There will be one ProfileBuilder for each (profile, entity) pair. + * This class ensures that each ProfileBuilder receives the telemetry messages that + * it needs. + */ +public interface MessageDistributor { + + /** + * Distribute a message along a MessageRoute. + * + * @param message The message that needs distributed. + * @param route The message route. + * @param context The Stellar execution context. + * @throws ExecutionException + */ + void distribute(JSONObject message, MessageRoute route, Context context) throws ExecutionException; + + /** + * Flushes all profiles. Flushes all ProfileBuilders that this distributor is responsible for. + * + * @return The profile measurements; one for each (profile, entity) pair. + */ + List<ProfileMeasurement> flush(); +} http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java new file mode 100644 index 0000000..1945671 --- /dev/null +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler; + +import org.apache.metron.common.configuration.profiler.ProfileConfig; + +/** + * A MessageRoute defines the profile and entity that a telemetry message needs applied to. This + * allows a message to be routed to the profile and entity that needs it. + * + * One telemetry message may need multiple routes. This is the case when a message is needed by + * more than one profile. In this case, there will be multiple MessageRoute objects for a single + * message. + */ +public class MessageRoute { + + /** + * The definition of the profile on this route. + */ + private ProfileConfig profileDefinition; + + /** + * The entity for this route. + */ + private String entity; + + public MessageRoute(ProfileConfig profileDefinition, String entity) { + this.entity = entity; + this.profileDefinition = profileDefinition; + } + + public String getEntity() { + return entity; + } + + public void setEntity(String entity) { + this.entity = entity; + } + + public ProfileConfig getProfileDefinition() { + return profileDefinition; + } + + public void setProfileDefinition(ProfileConfig profileDefinition) { + this.profileDefinition = profileDefinition; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRouter.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRouter.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRouter.java new file mode 100644 index 0000000..99c98a3 --- /dev/null +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRouter.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler; + +import org.apache.metron.common.configuration.profiler.ProfilerConfig; +import org.apache.metron.stellar.dsl.Context; +import org.json.simple.JSONObject; + +import java.util.List; + +/** + * Routes incoming telemetry messages. + * + * A single telemetry message may need to take multiple routes. This is the case + * when a message is needed by more than one profile. + */ +public interface MessageRouter { + + /** + * Route a telemetry message. Finds all routes for a given telemetry message. + * + * @param message The telemetry message that needs routed. + * @param config The configuration for the Profiler. + * @param context The Stellar execution context. + * @return A list of all the routes for the message. + */ + List<MessageRoute> route(JSONObject message, ProfilerConfig config, Context context); +} http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java index da8db82..9a5407b 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java @@ -20,32 +20,8 @@ package org.apache.metron.profiler; -import static java.lang.String.format; - -import com.google.common.collect.ImmutableMap; -import java.io.Serializable; -import java.lang.invoke.MethodHandles; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import org.apache.commons.collections4.ListUtils; -import org.apache.commons.collections4.MapUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.curator.framework.CuratorFramework; import org.apache.metron.common.configuration.profiler.ProfileConfig; -import org.apache.metron.profiler.clock.Clock; -import org.apache.metron.profiler.clock.WallClock; -import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor; -import org.apache.metron.stellar.common.StellarStatefulExecutor; -import org.apache.metron.stellar.dsl.Context; -import org.apache.metron.stellar.dsl.ParseException; -import org.apache.metron.stellar.dsl.StellarFunctions; import org.json.simple.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Responsible for building and maintaining a Profile. @@ -57,85 +33,13 @@ import org.slf4j.LoggerFactory; * pairing. There will exist many instances, one for each [profile, entity] pair that exists * within the incoming telemetry data applied to the profile. */ -public class ProfileBuilder implements Serializable { - - protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - /** - * The name of the profile. - */ - private String profileName; - - /** - * The name of the entity. - */ - private String entity; - - /** - * The definition of the Profile that the bolt is building. - */ - private ProfileConfig definition; - - /** - * Executes Stellar code and maintains state across multiple invocations. - */ - private StellarStatefulExecutor executor; - - /** - * Has the profile been initialized? - */ - private boolean isInitialized; - - /** - * The duration of each period in milliseconds. - */ - private long periodDurationMillis; - - /** - * A clock is used to tell time; imagine that. - */ - private Clock clock; - - /** - * Use the ProfileBuilder.Builder to create a new ProfileBuilder. - */ - private ProfileBuilder(ProfileConfig definition, - String entity, - Clock clock, - long periodDurationMillis, - CuratorFramework client, - Map<String, Object> global) { - - this.isInitialized = false; - this.definition = definition; - this.profileName = definition.getProfile(); - this.entity = entity; - this.clock = clock; - this.periodDurationMillis = periodDurationMillis; - this.executor = new DefaultStellarStatefulExecutor(); - Context context = new Context.Builder() - .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client) - .with(Context.Capabilities.GLOBAL_CONFIG, () -> global) - .with(Context.Capabilities.STELLAR_CONFIG, () -> global) - .build(); - StellarFunctions.initialize(context); - this.executor.setContext(context); - } +public interface ProfileBuilder { /** * Apply a message to the profile. * @param message The message to apply. */ - @SuppressWarnings("unchecked") - public void apply(JSONObject message) { - - if(!isInitialized()) { - assign(definition.getInit(), message, "init"); - isInitialized = true; - } - - assign(definition.getUpdate(), message, "update"); - } + void apply(JSONObject message); /** * Flush the Profile. @@ -145,203 +49,24 @@ public class ProfileBuilder implements Serializable { * * @return Returns the completed profile measurement. */ - public ProfileMeasurement flush() { - LOG.debug("Flushing profile: profile={}, entity={}", profileName, entity); - - // execute the 'profile' expression(s) - @SuppressWarnings("unchecked") - Object profileValue = execute(definition.getResult().getProfileExpressions().getExpression(), "result/profile"); - - // execute the 'triage' expression(s) - Map<String, Object> triageValues = definition.getResult().getTriageExpressions().getExpressions() - .entrySet() - .stream() - .collect(Collectors.toMap( - e -> e.getKey(), - e -> execute(e.getValue(), "result/triage"))); - - // execute the 'groupBy' expression(s) - can refer to value of 'result' expression - List<Object> groups = execute(definition.getGroupBy(), ImmutableMap.of("result", profileValue), "groupBy"); - - isInitialized = false; - return new ProfileMeasurement() - .withProfileName(profileName) - .withEntity(entity) - .withGroups(groups) - .withPeriod(clock.currentTimeMillis(), periodDurationMillis, TimeUnit.MILLISECONDS) - .withProfileValue(profileValue) - .withTriageValues(triageValues) - .withDefinition(definition); - } - - /** - * Executes an expression contained within the profile definition. - * @param expression The expression to execute. - * @param transientState Additional transient state provided to the expression. - * @param expressionType The type of expression; init, update, result. Provides additional context if expression execution fails. - * @return The result of executing the expression. - */ - private Object execute(String expression, Map<String, Object> transientState, String expressionType) { - Object result = null; - - List<Object> allResults = execute(Collections.singletonList(expression), transientState, expressionType); - if(allResults.size() > 0) { - result = allResults.get(0); - } - - return result; - } - - /** - * Executes an expression contained within the profile definition. - * @param expression The expression to execute. - * @param expressionType The type of expression; init, update, result. Provides additional context if expression execution fails. - * @return The result of executing the expression. - */ - private Object execute(String expression, String expressionType) { - return execute(expression, Collections.emptyMap(), expressionType); - } - - - /** - * Executes a set of expressions whose results need to be assigned to a variable. - * @param expressions Maps the name of a variable to the expression whose result should be assigned to it. - * @param transientState Additional transient state provided to the expression. - * @param expressionType The type of expression; init, update, result. Provides additional context if expression execution fails. - */ - private void assign(Map<String, String> expressions, Map<String, Object> transientState, String expressionType) { - try { - - // execute each of the 'update' expressions - MapUtils.emptyIfNull(expressions) - .forEach((var, expr) -> executor.assign(var, expr, transientState)); - - } catch(ParseException e) { - - // make it brilliantly clear that one of the 'update' expressions is bad - String msg = format("Bad '%s' expression: %s, profile=%s, entity=%s", expressionType, e.getMessage(), profileName, entity); - throw new ParseException(msg, e); - } - } + ProfileMeasurement flush(); /** - * Executes the expressions contained within the profile definition. - * @param expressions A list of expressions to execute. - * @param transientState Additional transient state provided to the expressions. - * @param expressionType The type of expression; init, update, result. Provides additional context if expression execution fails. - * @return The result of executing each expression. + * Has the ProfileBuilder been initialized? + * @return True, if initialization has occurred. False, otherwise. */ - private List<Object> execute(List<String> expressions, Map<String, Object> transientState, String expressionType) { - List<Object> results = new ArrayList<>(); - - try { - ListUtils.emptyIfNull(expressions) - .forEach((expr) -> results.add(executor.execute(expr, transientState, Object.class))); - - } catch (Throwable e) { - String msg = format("Bad '%s' expression: %s, profile=%s, entity=%s", expressionType, e.getMessage(), profileName, entity); - throw new ParseException(msg, e); - } - - return results; - } + boolean isInitialized(); /** - * Returns the current value of a variable. - * @param variable The name of the variable. + * Returns the definition of the profile being built. + * @return */ - public Object valueOf(String variable) { - return executor.getState().get(variable); - } - - public boolean isInitialized() { - return isInitialized; - } - - public ProfileConfig getDefinition() { - return definition; - } + ProfileConfig getDefinition(); /** - * A builder used to construct a new ProfileBuilder. + * Returns the value of a variable being maintained by the builder. + * @param variable The variable name. + * @return The value of the variable. */ - public static class Builder { - - private ProfileConfig definition; - private String entity; - private long periodDurationMillis; - private CuratorFramework zookeeperClient; - private Map<String, Object> global; - private Clock clock = new WallClock(); - - public Builder withClock(Clock clock) { - this.clock = clock; - return this; - } - - /** - * @param definition The profiler definition. - */ - public Builder withDefinition(ProfileConfig definition) { - this.definition = definition; - return this; - } - - /** - * @param entity The name of the entity - */ - public Builder withEntity(String entity) { - this.entity = entity; - return this; - } - - /** - * @param duration The duration of each profile period. - * @param units The units used to specify the duration of the profile period. - */ - public Builder withPeriodDuration(long duration, TimeUnit units) { - this.periodDurationMillis = units.toMillis(duration); - return this; - } - - /** - * @param millis The duration of each profile period in milliseconds. - */ - public Builder withPeriodDurationMillis(long millis) { - this.periodDurationMillis = millis; - return this; - } - - /** - * @param zookeeperClient The zookeeper client. - */ - public Builder withZookeeperClient(CuratorFramework zookeeperClient) { - this.zookeeperClient = zookeeperClient; - return this; - } - - /** - * @param global The global configuration. - */ - public Builder withGlobalConfiguration(Map<String, Object> global) { - // TODO how does the profile builder ever seen a global that has been update in zookeeper? - this.global = global; - return this; - } - - /** - * Construct a ProfileBuilder. - */ - public ProfileBuilder build() { - - if(definition == null) { - throw new IllegalArgumentException("missing profiler definition; got null"); - } - if(StringUtils.isEmpty(entity)) { - throw new IllegalArgumentException(format("missing entity name; got '%s'", entity)); - } - - return new ProfileBuilder(definition, entity, clock, periodDurationMillis, zookeeperClient, global); - } - } + Object valueOf(String variable); } http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java new file mode 100644 index 0000000..cf034c8 --- /dev/null +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler; + +import org.apache.metron.common.configuration.profiler.ProfilerConfig; +import org.apache.metron.stellar.dsl.Context; +import org.json.simple.JSONObject; + +import java.util.List; +import java.util.concurrent.ExecutionException; + +/** + * A stand alone version of the Profiler that does not require a + * distributed execution environment like Apache Storm. + */ +public class StandAloneProfiler { + + /** + * The Stellar execution context. + */ + private Context context; + + /** + * The configuration for the Profiler. + */ + private ProfilerConfig config; + + /** + * The message router. + */ + private MessageRouter router; + + /** + * The message distributor. + */ + private MessageDistributor distributor; + + public StandAloneProfiler(ProfilerConfig config, long periodDurationMillis, Context context) { + this.context = context; + this.config = config; + this.router = new DefaultMessageRouter(context); + // the period TTL does not matter in this context + this.distributor = new DefaultMessageDistributor(periodDurationMillis, Long.MAX_VALUE); + } + + /** + * Apply a message to a set of profiles. + * @param message The message to apply. + * @throws ExecutionException + */ + public void apply(JSONObject message) throws ExecutionException { + + List<MessageRoute> routes = router.route(message, config, context); + for(MessageRoute route : routes) { + distributor.distribute(message, route, context); + } + } + + /** + * Flush the set of profiles. + * @return A ProfileMeasurement for each (Profile, Entity) pair. + */ + public List<ProfileMeasurement> flush() { + return distributor.flush(); + } + + public ProfilerConfig getConfig() { + return config; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java new file mode 100644 index 0000000..ff4c289 --- /dev/null +++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java @@ -0,0 +1,157 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler; + +import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.common.configuration.profiler.ProfileConfig; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.stellar.dsl.Context; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +public class DefaultMessageDistributorTest { + + /** + * { + * "ip_src_addr": "10.0.0.1", + * "value": "22" + * } + */ + @Multiline + private String inputOne; + private JSONObject messageOne; + + /** + * { + * "ip_src_addr": "10.0.0.2", + * "value": "22" + * } + */ + @Multiline + private String inputTwo; + private JSONObject messageTwo; + + /** + * { + * "profile": "profile-one", + * "foreach": "ip_src_addr", + * "init": { "x": "0" }, + * "update": { "x": "x + 1" }, + * "result": "x" + * } + */ + @Multiline + private String profileOne; + + /** + * { + * "profile": "profile-two", + * "foreach": "ip_src_addr", + * "init": { "x": "0" }, + * "update": { "x": "x + 1" }, + * "result": "x" + * } + */ + @Multiline + private String profileTwo; + + private DefaultMessageDistributor distributor; + private Context context; + + @Before + public void setup() throws Exception { + context = Context.EMPTY_CONTEXT(); + JSONParser parser = new JSONParser(); + messageOne = (JSONObject) parser.parse(inputOne); + messageTwo = (JSONObject) parser.parse(inputTwo); + distributor = new DefaultMessageDistributor( + TimeUnit.MINUTES.toMillis(15), + TimeUnit.MINUTES.toMillis(30)); + } + + /** + * Creates a profile definition based on a string of JSON. + * @param json The string of JSON. + */ + private ProfileConfig createDefinition(String json) throws IOException { + return JSONUtils.INSTANCE.load(json, ProfileConfig.class); + } + + /** + * Tests that one message can be distributed to one profile. + */ + @Test + public void testDistribute() throws Exception { + ProfileConfig definition = createDefinition(profileOne); + String entity = (String) messageOne.get("ip_src_addr"); + MessageRoute route = new MessageRoute(definition, entity); + + // distribute one message + distributor.distribute(messageOne, route, context); + + // expect one measurement coming from one profile + List<ProfileMeasurement> measurements = distributor.flush(); + assertEquals(1, measurements.size()); + ProfileMeasurement m = measurements.get(0); + assertEquals(definition.getProfile(), m.getProfileName()); + assertEquals(entity, m.getEntity()); + } + + @Test + public void testDistributeWithTwoProfiles() throws Exception { + + // distribute one message to the first profile + String entity = (String) messageOne.get("ip_src_addr"); + distributor.distribute(messageOne, new MessageRoute(createDefinition(profileOne), entity), context); + + // distribute another message to the second profile, but same entity + distributor.distribute(messageOne, new MessageRoute(createDefinition(profileTwo), entity), context); + + // expect 2 measurements; 1 for each profile + List<ProfileMeasurement> measurements = distributor.flush(); + assertEquals(2, measurements.size()); + } + + @Test + public void testDistributeWithTwoEntities() throws Exception { + + // distribute one message + String entityOne = (String) messageOne.get("ip_src_addr"); + distributor.distribute(messageOne, new MessageRoute(createDefinition(profileOne), entityOne), context); + + // distribute another message with a different entity + String entityTwo = (String) messageTwo.get("ip_src_addr"); + distributor.distribute(messageTwo, new MessageRoute(createDefinition(profileTwo), entityTwo), context); + + // expect 2 measurements; 1 for each entity + List<ProfileMeasurement> measurements = distributor.flush(); + assertEquals(2, measurements.size()); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageRouterTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageRouterTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageRouterTest.java new file mode 100644 index 0000000..d0c0fbc --- /dev/null +++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageRouterTest.java @@ -0,0 +1,183 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.metron.profiler; + +import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.common.configuration.profiler.ProfilerConfig; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.stellar.dsl.Context; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class DefaultMessageRouterTest { + + /** + * { + * "ip_src_addr": "10.0.0.1", + * "value": "22" + * } + */ + @Multiline + private String inputOne; + private JSONObject messageOne; + + /** + * { + * "ip_src_addr": "10.0.0.2", + * "value": "22" + * } + */ + @Multiline + private String inputTwo; + private JSONObject messageTwo; + + /** + * { + * "profiles": [ ] + * } + */ + @Multiline + private String noProfiles; + + /** + * { + * "profiles": [ + * { + * "profile": "profile-one", + * "foreach": "ip_src_addr", + * "init": { "x": "0" }, + * "update": { "x": "x + 1" }, + * "result": "x" + * } + * ] + * } + */ + @Multiline + private String oneProfile; + + /** + * { + * "profiles": [ + * { + * "profile": "profile-one", + * "foreach": "ip_src_addr", + * "init": { "x": "0" }, + * "update": { "x": "x + 1" }, + * "result": "x" + * }, + * { + * "profile": "profile-two", + * "foreach": "ip_src_addr", + * "init": { "x": "0" }, + * "update": { "x": "x + 1" }, + * "result": "x" + * } + * ] + * } + */ + @Multiline + private String twoProfiles; + + /** + * { + * "profiles": [ + * { + * "profile": "profile-one", + * "onlyif": "false", + * "foreach": "ip_src_addr", + * "init": { "x": "0" }, + * "update": { "x": "x + 1" }, + * "result": "x" + * } + * ] + * } + */ + @Multiline + private String exclusiveProfile; + + private DefaultMessageRouter router; + private Context context; + + /** + * Creates a profile definition based on a string of JSON. + * @param json The string of JSON. + */ + private ProfilerConfig createConfig(String json) throws IOException { + return JSONUtils.INSTANCE.load(json, ProfilerConfig.class); + } + + @Before + public void setup() throws Exception { + this.router = new DefaultMessageRouter(Context.EMPTY_CONTEXT()); + this.context = Context.EMPTY_CONTEXT(); + JSONParser parser = new JSONParser(); + this.messageOne = (JSONObject) parser.parse(inputOne); + this.messageTwo = (JSONObject) parser.parse(inputTwo); + } + + @Test + public void testWithOneRoute() throws Exception { + List<MessageRoute> routes = router.route(messageOne, createConfig(oneProfile), context); + + assertEquals(1, routes.size()); + MessageRoute route1 = routes.get(0); + assertEquals(messageOne.get("ip_src_addr"), route1.getEntity()); + assertEquals("profile-one", route1.getProfileDefinition().getProfile()); + } + + @Test + public void testWithNoRoutes() throws Exception { + List<MessageRoute> routes = router.route(messageOne, createConfig(noProfiles), context); + assertEquals(0, routes.size()); + } + + @Test + public void testWithTwoRoutes() throws Exception { + List<MessageRoute> routes = router.route(messageOne, createConfig(twoProfiles), context); + + assertEquals(2, routes.size()); + { + MessageRoute route1 = routes.get(0); + assertEquals(messageOne.get("ip_src_addr"), route1.getEntity()); + assertEquals("profile-one", route1.getProfileDefinition().getProfile()); + } + { + MessageRoute route2 = routes.get(1); + assertEquals(messageOne.get("ip_src_addr"), route2.getEntity()); + assertEquals("profile-two", route2.getProfileDefinition().getProfile()); + } + } + + /** + * The 'onlyif' condition should exclude some messages from being routed to a profile. + */ + @Test + public void testExclusiveProfile() throws Exception { + List<MessageRoute> routes = router.route(messageOne, createConfig(exclusiveProfile), context); + assertEquals(0, routes.size()); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java new file mode 100644 index 0000000..fa5c19c --- /dev/null +++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java @@ -0,0 +1,472 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler; + +import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.common.configuration.profiler.ProfileConfig; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.profiler.clock.FixedClock; +import org.apache.metron.stellar.dsl.Context; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.apache.metron.stellar.common.utils.ConversionUtils.convert; +import static org.junit.Assert.assertEquals; + +/** + * Tests the ProfileBuilder class. + */ +public class DefaultProfileBuilderTest { + + /** + * { + * "ip_src_addr": "10.0.0.1", + * "ip_dst_addr": "10.0.0.20", + * "value": 100 + * } + */ + @Multiline + private String input; + private JSONObject message; + private ProfileBuilder builder; + private ProfileConfig definition; + + @Before + public void setup() throws Exception { + message = (JSONObject) new JSONParser().parse(input); + } + + /** + * { + * "profile": "test", + * "foreach": "ip_src_addr", + * "init": { + * "x": "100", + * "y": "200" + * }, + * "result": "x + y" + * } + */ + @Multiline + private String testInitProfile; + + /** + * Ensure that the 'init' block is executed correctly. + */ + @Test + public void testInit() throws Exception { + // setup + definition = JSONUtils.INSTANCE.load(testInitProfile, ProfileConfig.class); + builder = new DefaultProfileBuilder.Builder() + .withDefinition(definition) + .withEntity("10.0.0.1") + .withPeriodDuration(10, TimeUnit.MINUTES) + .withContext(Context.EMPTY_CONTEXT()) + .build(); + + // execute + builder.apply(message); + ProfileMeasurement m = builder.flush(); + + // validate that x = 100, y = 200 + assertEquals(100 + 200, (int) convert(m.getProfileValue(), Integer.class)); + } + + /** + * The 'init' block is executed only when the first message is received. If no message + * has been received, the 'init' block will not be executed. + */ + @Test + public void testInitWithNoMessage() throws Exception { + // setup + definition = JSONUtils.INSTANCE.load(testInitProfile, ProfileConfig.class); + builder = new DefaultProfileBuilder.Builder() + .withDefinition(definition) + .withEntity("10.0.0.1") + .withPeriodDuration(10, TimeUnit.MINUTES) + .withContext(Context.EMPTY_CONTEXT()) + .build(); + + // execute + ProfileMeasurement m = builder.flush(); + + // validate that x = 0 and y = 0 as no initialization occurred + assertEquals(0, (int) convert(m.getProfileValue(), Integer.class)); + } + + /** + * { + * "profile": "test", + * "foreach": "ip_src_addr", + * "init": { + * "x": "0", + * "y": "0" + * }, + * "update": { + * "x": "x + 1", + * "y": "y + 2" + * }, + * "result": "x + y" + * } + */ + @Multiline + private String testUpdateProfile; + + /** + * Ensure that the 'update' expressions are executed for each message applied to the profile. + */ + @Test + public void testUpdate() throws Exception { + // setup + definition = JSONUtils.INSTANCE.load(testUpdateProfile, ProfileConfig.class); + builder = new DefaultProfileBuilder.Builder() + .withDefinition(definition) + .withEntity("10.0.0.1") + .withPeriodDuration(10, TimeUnit.MINUTES) + .withContext(Context.EMPTY_CONTEXT()) + .build(); + + // execute + int count = 10; + for(int i=0; i<count; i++) { + builder.apply(message); + } + ProfileMeasurement m = builder.flush(); + + // validate that x=0, y=0 then x+=1, y+=2 for each message + assertEquals(count*1 + count*2, (int) convert(m.getProfileValue(), Integer.class)); + } + + /** + * { + * "profile": "test", + * "foreach": "ip_src_addr", + * "init": { "x": "100" }, + * "result": "x" + * } + */ + @Multiline + private String testResultProfile; + + /** + * Ensure that the result expression is executed on a flush. + */ + @Test + public void testResult() throws Exception { + // setup + definition = JSONUtils.INSTANCE.load(testResultProfile, ProfileConfig.class); + builder = new DefaultProfileBuilder.Builder() + .withDefinition(definition) + .withEntity("10.0.0.1") + .withPeriodDuration(10, TimeUnit.MINUTES) + .withContext(Context.EMPTY_CONTEXT()) + .build(); + + // execute + builder.apply(message); + ProfileMeasurement m = builder.flush(); + + // validate + assertEquals(100, (int) convert(m.getProfileValue(), Integer.class)); + } + + /** + * Ensure that time advances properly on each flush. + */ + @Test + public void testProfilePeriodOnFlush() throws Exception { + // setup + FixedClock clock = new FixedClock(); + clock.setTime(100); + + definition = JSONUtils.INSTANCE.load(testResultProfile, ProfileConfig.class); + builder = new DefaultProfileBuilder.Builder() + .withDefinition(definition) + .withEntity("10.0.0.1") + .withPeriodDuration(10, TimeUnit.MINUTES) + .withContext(Context.EMPTY_CONTEXT()) + .withClock(clock) + .build(); + + { + // apply a message and flush + builder.apply(message); + ProfileMeasurement m = builder.flush(); + + // validate the profile period + ProfilePeriod expected = new ProfilePeriod(clock.currentTimeMillis(), 10, TimeUnit.MINUTES); + assertEquals(expected, m.getPeriod()); + } + { + // advance time by at least one period - 10 minutes + clock.setTime(clock.currentTimeMillis() + TimeUnit.MINUTES.toMillis(10)); + + // apply a message and flush again + builder.apply(message); + ProfileMeasurement m = builder.flush(); + + // validate the profile period + ProfilePeriod expected = new ProfilePeriod(clock.currentTimeMillis(), 10, TimeUnit.MINUTES); + assertEquals(expected, m.getPeriod()); + } + } + + + /** + * { + * "profile": "test", + * "foreach": "ip_src_addr", + * "init": { "x": "100" }, + * "groupBy": ["x * 1", "x * 2"], + * "result": "100.0" + * } + */ + @Multiline + private String testGroupByProfile; + + /** + * Ensure that the 'groupBy' expression is executed correctly. + */ + @Test + public void testGroupBy() throws Exception { + // setup + definition = JSONUtils.INSTANCE.load(testGroupByProfile, ProfileConfig.class); + builder = new DefaultProfileBuilder.Builder() + .withDefinition(definition) + .withEntity("10.0.0.1") + .withPeriodDuration(10, TimeUnit.MINUTES) + .withContext(Context.EMPTY_CONTEXT()) + .build(); + + // execute + builder.apply(message); + ProfileMeasurement m = builder.flush(); + + // validate + assertEquals(2, m.getGroups().size()); + assertEquals(100, m.getGroups().get(0)); + assertEquals(200, m.getGroups().get(1)); + } + + /** + * { + * "profile": "test", + * "foreach": "ip_src_addr", + * "init": { + * "x": "if exists(x) then x else 0", + * "y": "if exists(y) then y else 0" + * }, + * "update": { + * "x": "x + 1", + * "y": "y + 2" + * }, + * "result": "x + y" + * } + */ + @Multiline + private String testFlushProfile; + + @Test + public void testFlushDoesNotClearsState() throws Exception { + // setup + definition = JSONUtils.INSTANCE.load(testFlushProfile, ProfileConfig.class); + builder = new DefaultProfileBuilder.Builder() + .withDefinition(definition) + .withEntity("10.0.0.1") + .withPeriodDuration(10, TimeUnit.MINUTES) + .withContext(Context.EMPTY_CONTEXT()) + .build(); + + // execute - accumulate some state then flush it + int count = 10; + for(int i=0; i<count; i++) { + builder.apply(message); + } + builder.flush(); + + // apply another message to accumulate new state, then flush again to validate original state was cleared + builder.apply(message); + ProfileMeasurement m = builder.flush(); + + // validate + assertEquals(33, m.getProfileValue()); + } + + /** + * { + * "profile": "test", + * "foreach": "ip_src_addr", + * "init": { + * "x": "0", + * "y": "0" + * }, + * "update": { + * "x": "x + 1", + * "y": "y + 2" + * }, + * "result": "x + y" + * } + */ + @Multiline + private String testFlushProfileWithNaiveInit; + + @Test + public void testFlushDoesNotClearsStateButInitDoes() throws Exception { + // setup + definition = JSONUtils.INSTANCE.load(testFlushProfileWithNaiveInit, ProfileConfig.class); + builder = new DefaultProfileBuilder.Builder() + .withDefinition(definition) + .withEntity("10.0.0.1") + .withPeriodDuration(10, TimeUnit.MINUTES) + .withContext(Context.EMPTY_CONTEXT()) + .build(); + + // execute - accumulate some state then flush it + int count = 10; + for(int i=0; i<count; i++) { + builder.apply(message); + } + builder.flush(); + + // apply another message to accumulate new state, then flush again to validate original state was cleared + builder.apply(message); + ProfileMeasurement m = builder.flush(); + + // validate + assertEquals(3, m.getProfileValue()); + } + /** + * { + * "profile": "test", + * "foreach": "ip_src_addr", + * "result": "100" + * } + */ + @Multiline + private String testEntityProfile; + + /** + * Ensure that the entity is correctly set on the resulting profile measurements. + */ + @Test + public void testEntity() throws Exception { + // setup + final String entity = "10.0.0.1"; + definition = JSONUtils.INSTANCE.load(testFlushProfile, ProfileConfig.class); + builder = new DefaultProfileBuilder.Builder() + .withDefinition(definition) + .withEntity(entity) + .withPeriodDuration(10, TimeUnit.MINUTES) + .withContext(Context.EMPTY_CONTEXT()) + .build(); + + // execute + builder.apply(message); + ProfileMeasurement m = builder.flush(); + + // validate + assertEquals(entity, m.getEntity()); + } + + /** + * { + * "profile": "test", + * "foreach": "ip_src_addr", + * "init": { + * "x": "100" + * }, + * "result": { + * "profile": "x" + * } + * } + */ + @Multiline + private String testResultWithProfileExpression; + + /** + * Ensure that the result expression is executed on a flush. + */ + @Test + public void testResultWithProfileExpression() throws Exception { + // setup + definition = JSONUtils.INSTANCE.load(testResultWithProfileExpression, ProfileConfig.class); + builder = new DefaultProfileBuilder.Builder() + .withDefinition(definition) + .withEntity("10.0.0.1") + .withPeriodDuration(10, TimeUnit.MINUTES) + .withContext(Context.EMPTY_CONTEXT()) + .build(); + + // execute + builder.apply(message); + ProfileMeasurement m = builder.flush(); + + // validate + assertEquals(100, m.getProfileValue()); + } + + /** + * { + * "profile": "test", + * "foreach": "ip_src_addr", + * "init": { + * "x": "100" + * }, + * "result": { + * "profile": "x", + * "triage": { + * "zero": "x - 100", + * "hundred": "x" + * } + * } + * } + */ + @Multiline + private String testResultWithTriageExpression; + + /** + * Ensure that the result expression is executed on a flush. + */ + @Test + public void testResultWithTriageExpression() throws Exception { + // setup + definition = JSONUtils.INSTANCE.load(testResultWithTriageExpression, ProfileConfig.class); + builder = new DefaultProfileBuilder.Builder() + .withDefinition(definition) + .withEntity("10.0.0.1") + .withPeriodDuration(10, TimeUnit.MINUTES) + .withContext(Context.EMPTY_CONTEXT()) + .build(); + + // execute + builder.apply(message); + ProfileMeasurement m = builder.flush(); + + // validate + assertEquals(0, m.getTriageValues().get("zero")); + assertEquals(100, m.getTriageValues().get("hundred")); + assertEquals(100, m.getProfileValue()); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java deleted file mode 100644 index aa632e4..0000000 --- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java +++ /dev/null @@ -1,460 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.metron.profiler; - -import org.adrianwalker.multilinestring.Multiline; -import org.apache.metron.common.configuration.profiler.ProfileConfig; -import org.apache.metron.common.utils.JSONUtils; -import org.apache.metron.profiler.clock.FixedClock; -import org.json.simple.JSONObject; -import org.json.simple.parser.JSONParser; -import org.junit.Before; -import org.junit.Test; - -import java.util.concurrent.TimeUnit; - -import static org.apache.metron.stellar.common.utils.ConversionUtils.convert; -import static org.junit.Assert.assertEquals; - -/** - * Tests the ProfileBuilder class. - */ -public class ProfileBuilderTest { - - /** - * { - * "ip_src_addr": "10.0.0.1", - * "ip_dst_addr": "10.0.0.20", - * "value": 100 - * } - */ - @Multiline - private String input; - private JSONObject message; - private ProfileBuilder builder; - private ProfileConfig definition; - - @Before - public void setup() throws Exception { - message = (JSONObject) new JSONParser().parse(input); - } - - /** - * { - * "profile": "test", - * "foreach": "ip_src_addr", - * "init": { - * "x": "100", - * "y": "200" - * }, - * "result": "x + y" - * } - */ - @Multiline - private String testInitProfile; - - /** - * Ensure that the 'init' block is executed correctly. - */ - @Test - public void testInit() throws Exception { - // setup - definition = JSONUtils.INSTANCE.load(testInitProfile, ProfileConfig.class); - builder = new ProfileBuilder.Builder() - .withDefinition(definition) - .withEntity("10.0.0.1") - .withPeriodDuration(10, TimeUnit.MINUTES) - .build(); - - // execute - builder.apply(message); - ProfileMeasurement m = builder.flush(); - - // validate that x = 100, y = 200 - assertEquals(100 + 200, (int) convert(m.getProfileValue(), Integer.class)); - } - - /** - * The 'init' block is executed only when the first message is received. If no message - * has been received, the 'init' block will not be executed. - */ - @Test - public void testInitWithNoMessage() throws Exception { - // setup - definition = JSONUtils.INSTANCE.load(testInitProfile, ProfileConfig.class); - builder = new ProfileBuilder.Builder() - .withDefinition(definition) - .withEntity("10.0.0.1") - .withPeriodDuration(10, TimeUnit.MINUTES) - .build(); - - // execute - ProfileMeasurement m = builder.flush(); - - // validate that x = 0 and y = 0 as no initialization occurred - assertEquals(0, (int) convert(m.getProfileValue(), Integer.class)); - } - - /** - * { - * "profile": "test", - * "foreach": "ip_src_addr", - * "init": { - * "x": "0", - * "y": "0" - * }, - * "update": { - * "x": "x + 1", - * "y": "y + 2" - * }, - * "result": "x + y" - * } - */ - @Multiline - private String testUpdateProfile; - - /** - * Ensure that the 'update' expressions are executed for each message applied to the profile. - */ - @Test - public void testUpdate() throws Exception { - // setup - definition = JSONUtils.INSTANCE.load(testUpdateProfile, ProfileConfig.class); - builder = new ProfileBuilder.Builder() - .withDefinition(definition) - .withEntity("10.0.0.1") - .withPeriodDuration(10, TimeUnit.MINUTES) - .build(); - - // execute - int count = 10; - for(int i=0; i<count; i++) { - builder.apply(message); - } - ProfileMeasurement m = builder.flush(); - - // validate that x=0, y=0 then x+=1, y+=2 for each message - assertEquals(count*1 + count*2, (int) convert(m.getProfileValue(), Integer.class)); - } - - /** - * { - * "profile": "test", - * "foreach": "ip_src_addr", - * "init": { "x": "100" }, - * "result": "x" - * } - */ - @Multiline - private String testResultProfile; - - /** - * Ensure that the result expression is executed on a flush. - */ - @Test - public void testResult() throws Exception { - // setup - definition = JSONUtils.INSTANCE.load(testResultProfile, ProfileConfig.class); - builder = new ProfileBuilder.Builder() - .withDefinition(definition) - .withEntity("10.0.0.1") - .withPeriodDuration(10, TimeUnit.MINUTES) - .build(); - - // execute - builder.apply(message); - ProfileMeasurement m = builder.flush(); - - // validate - assertEquals(100, (int) convert(m.getProfileValue(), Integer.class)); - } - - /** - * Ensure that time advances properly on each flush. - */ - @Test - public void testProfilePeriodOnFlush() throws Exception { - // setup - FixedClock clock = new FixedClock(); - clock.setTime(100); - - definition = JSONUtils.INSTANCE.load(testResultProfile, ProfileConfig.class); - builder = new ProfileBuilder.Builder() - .withDefinition(definition) - .withEntity("10.0.0.1") - .withPeriodDuration(10, TimeUnit.MINUTES) - .withClock(clock) - .build(); - - { - // apply a message and flush - builder.apply(message); - ProfileMeasurement m = builder.flush(); - - // validate the profile period - ProfilePeriod expected = new ProfilePeriod(clock.currentTimeMillis(), 10, TimeUnit.MINUTES); - assertEquals(expected, m.getPeriod()); - } - { - // advance time by at least one period - 10 minutes - clock.setTime(clock.currentTimeMillis() + TimeUnit.MINUTES.toMillis(10)); - - // apply a message and flush again - builder.apply(message); - ProfileMeasurement m = builder.flush(); - - // validate the profile period - ProfilePeriod expected = new ProfilePeriod(clock.currentTimeMillis(), 10, TimeUnit.MINUTES); - assertEquals(expected, m.getPeriod()); - } - } - - - /** - * { - * "profile": "test", - * "foreach": "ip_src_addr", - * "init": { "x": "100" }, - * "groupBy": ["x * 1", "x * 2"], - * "result": "100.0" - * } - */ - @Multiline - private String testGroupByProfile; - - /** - * Ensure that the 'groupBy' expression is executed correctly. - */ - @Test - public void testGroupBy() throws Exception { - // setup - definition = JSONUtils.INSTANCE.load(testGroupByProfile, ProfileConfig.class); - builder = new ProfileBuilder.Builder() - .withDefinition(definition) - .withEntity("10.0.0.1") - .withPeriodDuration(10, TimeUnit.MINUTES) - .build(); - - // execute - builder.apply(message); - ProfileMeasurement m = builder.flush(); - - // validate - assertEquals(2, m.getGroups().size()); - assertEquals(100, m.getGroups().get(0)); - assertEquals(200, m.getGroups().get(1)); - } - - /** - * { - * "profile": "test", - * "foreach": "ip_src_addr", - * "init": { - * "x": "if exists(x) then x else 0", - * "y": "if exists(y) then y else 0" - * }, - * "update": { - * "x": "x + 1", - * "y": "y + 2" - * }, - * "result": "x + y" - * } - */ - @Multiline - private String testFlushProfile; - - @Test - public void testFlushDoesNotClearsState() throws Exception { - // setup - definition = JSONUtils.INSTANCE.load(testFlushProfile, ProfileConfig.class); - builder = new ProfileBuilder.Builder() - .withDefinition(definition) - .withEntity("10.0.0.1") - .withPeriodDuration(10, TimeUnit.MINUTES) - .build(); - - // execute - accumulate some state then flush it - int count = 10; - for(int i=0; i<count; i++) { - builder.apply(message); - } - builder.flush(); - - // apply another message to accumulate new state, then flush again to validate original state was cleared - builder.apply(message); - ProfileMeasurement m = builder.flush(); - - // validate - assertEquals(33, m.getProfileValue()); - } - - /** - * { - * "profile": "test", - * "foreach": "ip_src_addr", - * "init": { - * "x": "0", - * "y": "0" - * }, - * "update": { - * "x": "x + 1", - * "y": "y + 2" - * }, - * "result": "x + y" - * } - */ - @Multiline - private String testFlushProfileWithNaiveInit; - - @Test - public void testFlushDoesNotClearsStateButInitDoes() throws Exception { - // setup - definition = JSONUtils.INSTANCE.load(testFlushProfileWithNaiveInit, ProfileConfig.class); - builder = new ProfileBuilder.Builder() - .withDefinition(definition) - .withEntity("10.0.0.1") - .withPeriodDuration(10, TimeUnit.MINUTES) - .build(); - - // execute - accumulate some state then flush it - int count = 10; - for(int i=0; i<count; i++) { - builder.apply(message); - } - builder.flush(); - - // apply another message to accumulate new state, then flush again to validate original state was cleared - builder.apply(message); - ProfileMeasurement m = builder.flush(); - - // validate - assertEquals(3, m.getProfileValue()); - } - /** - * { - * "profile": "test", - * "foreach": "ip_src_addr", - * "result": "100" - * } - */ - @Multiline - private String testEntityProfile; - - /** - * Ensure that the entity is correctly set on the resulting profile measurements. - */ - @Test - public void testEntity() throws Exception { - // setup - final String entity = "10.0.0.1"; - definition = JSONUtils.INSTANCE.load(testFlushProfile, ProfileConfig.class); - builder = new ProfileBuilder.Builder() - .withDefinition(definition) - .withEntity(entity) - .withPeriodDuration(10, TimeUnit.MINUTES) - .build(); - - // execute - builder.apply(message); - ProfileMeasurement m = builder.flush(); - - // validate - assertEquals(entity, m.getEntity()); - } - - /** - * { - * "profile": "test", - * "foreach": "ip_src_addr", - * "init": { - * "x": "100" - * }, - * "result": { - * "profile": "x" - * } - * } - */ - @Multiline - private String testResultWithProfileExpression; - - /** - * Ensure that the result expression is executed on a flush. - */ - @Test - public void testResultWithProfileExpression() throws Exception { - // setup - definition = JSONUtils.INSTANCE.load(testResultWithProfileExpression, ProfileConfig.class); - builder = new ProfileBuilder.Builder() - .withDefinition(definition) - .withEntity("10.0.0.1") - .withPeriodDuration(10, TimeUnit.MINUTES) - .build(); - - // execute - builder.apply(message); - ProfileMeasurement m = builder.flush(); - - // validate - assertEquals(100, m.getProfileValue()); - } - - /** - * { - * "profile": "test", - * "foreach": "ip_src_addr", - * "init": { - * "x": "100" - * }, - * "result": { - * "profile": "x", - * "triage": { - * "zero": "x - 100", - * "hundred": "x" - * } - * } - * } - */ - @Multiline - private String testResultWithTriageExpression; - - /** - * Ensure that the result expression is executed on a flush. - */ - @Test - public void testResultWithTriageExpression() throws Exception { - // setup - definition = JSONUtils.INSTANCE.load(testResultWithTriageExpression, ProfileConfig.class); - builder = new ProfileBuilder.Builder() - .withDefinition(definition) - .withEntity("10.0.0.1") - .withPeriodDuration(10, TimeUnit.MINUTES) - .build(); - - // execute - builder.apply(message); - ProfileMeasurement m = builder.flush(); - - // validate - assertEquals(0, m.getTriageValues().get("zero")); - assertEquals(100, m.getTriageValues().get("hundred")); - assertEquals(100, m.getProfileValue()); - } -} http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java index 1323089..3c8d875 100644 --- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java +++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java @@ -20,22 +20,13 @@ package org.apache.metron.profiler.bolt; -import static java.lang.String.format; - -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import java.lang.invoke.MethodHandles; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import org.apache.metron.common.bolt.ConfiguredProfilerBolt; import org.apache.metron.common.configuration.profiler.ProfileConfig; -import org.apache.metron.stellar.common.utils.ConversionUtils; -import org.apache.metron.profiler.ProfileBuilder; +import org.apache.metron.profiler.DefaultMessageDistributor; +import org.apache.metron.profiler.MessageRoute; import org.apache.metron.profiler.ProfileMeasurement; -import org.apache.metron.profiler.clock.WallClock; +import org.apache.metron.stellar.common.utils.ConversionUtils; +import org.apache.metron.stellar.dsl.Context; import org.apache.storm.Config; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; @@ -47,6 +38,15 @@ import org.json.simple.parser.JSONParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static java.lang.String.format; + /** * A bolt that is responsible for building a Profile. * @@ -75,9 +75,9 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt { private long profileTimeToLiveMillis; /** - * Maintains the state of a profile which is unique to a profile/entity pair. + * Distributes messages to the profile builders. */ - private transient Cache<String, ProfileBuilder> profileCache; + private DefaultMessageDistributor messageDistributor; /** * Parses JSON messages. @@ -122,10 +122,7 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt { } this.collector = collector; this.parser = new JSONParser(); - this.profileCache = CacheBuilder - .newBuilder() - .expireAfterAccess(profileTimeToLiveMillis, TimeUnit.MILLISECONDS) - .build(); + this.messageDistributor = new DefaultMessageDistributor(periodDurationMillis, profileTimeToLiveMillis); } @Override @@ -138,6 +135,15 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt { destinationHandlers.forEach(dest -> dest.declareOutputFields(declarer)); } + private Context getStellarContext() { + Map<String, Object> global = getConfigurations().getGlobalConfig(); + return new Context.Builder() + .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client) + .with(Context.Capabilities.GLOBAL_CONFIG, () -> global) + .with(Context.Capabilities.STELLAR_CONFIG, () -> global) + .build(); + } + /** * Expect to receive either a tick tuple or a telemetry message that needs applied * to a profile. @@ -148,7 +154,6 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt { try { if(TupleUtils.isTick(input)) { handleTick(); - profileCache.cleanUp(); } else { handleMessage(input); @@ -169,51 +174,23 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt { */ private void handleMessage(Tuple input) throws ExecutionException { JSONObject message = getField("message", input, JSONObject.class); - getBuilder(input).apply(message); + ProfileConfig definition = getField("profile", input, ProfileConfig.class); + String entity = getField("entity", input, String.class); + MessageRoute route = new MessageRoute(definition, entity); + + messageDistributor.distribute(message, route, getStellarContext()); } /** * Handles a tick tuple. */ private void handleTick() { - profileCache.asMap().forEach((key, profileBuilder) -> { - if(profileBuilder.isInitialized()) { - - // flush the profile - ProfileMeasurement measurement = profileBuilder.flush(); - - // forward the measurement to each destination handler - destinationHandlers.forEach(handler -> handler.emit(measurement, collector)); - } - }); - } + List<ProfileMeasurement> measurements = messageDistributor.flush(); - /** - * Builds the key that is used to lookup the ProfileState within the cache. - * @param tuple A tuple. - */ - private String cacheKey(Tuple tuple) { - return format("%s:%s", - getField("profile", tuple, ProfileConfig.class), - getField("entity", tuple, String.class)); - } - - /** - * Retrieves the cached ProfileBuilder that is used to build and maintain the Profile. If none exists, - * one will be created and returned. - * @param tuple The tuple. - */ - protected ProfileBuilder getBuilder(Tuple tuple) throws ExecutionException { - return profileCache.get( - cacheKey(tuple), - () -> new ProfileBuilder.Builder() - .withDefinition(getField("profile", tuple, ProfileConfig.class)) - .withEntity(getField("entity", tuple, String.class)) - .withPeriodDurationMillis(periodDurationMillis) - .withGlobalConfiguration(getConfigurations().getGlobalConfig()) - .withZookeeperClient(client) - .withClock(new WallClock()) - .build()); + // forward the measurements to each destination handler + for(ProfileMeasurement m : measurements ) { + destinationHandlers.forEach(handler -> handler.emit(m, collector)); + } } /** @@ -255,4 +232,8 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt { this.destinationHandlers.add(handler); return this; } + + public DefaultMessageDistributor getMessageDistributor() { + return messageDistributor; + } } http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java index 255069a..a453c66 100644 --- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java +++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java @@ -20,16 +20,12 @@ package org.apache.metron.profiler.bolt; -import java.io.UnsupportedEncodingException; -import java.lang.invoke.MethodHandles; -import java.util.Map; import org.apache.metron.common.bolt.ConfiguredProfilerBolt; -import org.apache.metron.common.configuration.profiler.ProfileConfig; import org.apache.metron.common.configuration.profiler.ProfilerConfig; -import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor; -import org.apache.metron.stellar.common.StellarStatefulExecutor; +import org.apache.metron.profiler.MessageRouter; +import org.apache.metron.profiler.MessageRoute; +import org.apache.metron.profiler.DefaultMessageRouter; import org.apache.metron.stellar.dsl.Context; -import org.apache.metron.stellar.dsl.StellarFunctions; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -42,6 +38,11 @@ import org.json.simple.parser.ParseException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.UnsupportedEncodingException; +import java.lang.invoke.MethodHandles; +import java.util.List; +import java.util.Map; + /** * The bolt responsible for filtering incoming messages and directing * each to the one or more bolts responsible for building a Profile. Each @@ -59,9 +60,9 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt { private transient JSONParser parser; /** - * Executes Stellar code. + * The router responsible for routing incoming messages. */ - private StellarStatefulExecutor executor; + private MessageRouter router; /** * @param zookeeperUrl The Zookeeper URL that contains the configuration for this bolt. @@ -75,18 +76,16 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt { super.prepare(stormConf, context, collector); this.collector = collector; this.parser = new JSONParser(); - this.executor = new DefaultStellarStatefulExecutor(); - initializeStellar(); + this.router = new DefaultMessageRouter(getStellarContext()); } - protected void initializeStellar() { - Context context = new Context.Builder() + private Context getStellarContext() { + Map<String, Object> global = getConfigurations().getGlobalConfig(); + return new Context.Builder() .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client) - .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig()) - .with(Context.Capabilities.STELLAR_CONFIG, () -> getConfigurations().getGlobalConfig()) + .with(Context.Capabilities.GLOBAL_CONFIG, () -> global) + .with(Context.Capabilities.STELLAR_CONFIG, () -> global) .build(); - StellarFunctions.initialize(context); - executor.setContext(context); } @Override @@ -104,7 +103,6 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt { } private void doExecute(Tuple input) throws ParseException, UnsupportedEncodingException { - // retrieve the input message byte[] data = input.getBinary(0); JSONObject message = (JSONObject) parser.parse(new String(data, "UTF8")); @@ -113,9 +111,10 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt { ProfilerConfig config = getProfilerConfig(); if(config != null) { - // apply the message to each of the profile definitions - for (ProfileConfig profile: config.getProfiles()) { - applyProfile(profile, input, message); + // emit a message for each 'route' + List<MessageRoute> routes = router.route(message, config, getStellarContext()); + for(MessageRoute route : routes) { + collector.emit(input, new Values(route.getEntity(), route.getProfileDefinition(), message)); } } else { @@ -124,27 +123,6 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt { } /** - * Applies a message to a Profile definition. - * @param profile The profile definition. - * @param input The input tuple that delivered the message. - * @param message The message that may be needed by the profile. - */ - private void applyProfile(ProfileConfig profile, Tuple input, JSONObject message) throws ParseException, UnsupportedEncodingException { - @SuppressWarnings("unchecked") - Map<String, Object> state = (Map<String, Object>)message; - - // is this message needed by this profile? - if (executor.execute(profile.getOnlyif(), state, Boolean.class)) { - - // what is the name of the entity in this message? - String entity = executor.execute(profile.getForeach(), state, String.class); - - // emit a message for the bolt responsible for building this profile - collector.emit(input, new Values(entity, profile, message)); - } - } - - /** * Each emitted tuple contains the following fields. * <p> * <ol> @@ -159,11 +137,7 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt { declarer.declare(new Fields("entity", "profile", "message")); } - public StellarStatefulExecutor getExecutor() { - return executor; - } - - public void setExecutor(StellarStatefulExecutor executor) { - this.executor = executor; + protected MessageRouter getMessageRouter() { + return router; } }
