Repository: metron Updated Branches: refs/heads/feature/METRON-1699-create-batch-profiler 4fb920167 -> 5eff97fbe
METRON-1704 Message Timestamp Logic Should be Shared (nickwallen) closes apache/metron#1146 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/5eff97fb Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/5eff97fb Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/5eff97fb Branch: refs/heads/feature/METRON-1699-create-batch-profiler Commit: 5eff97fbe4a99b4d9fdec1010cfa3358cf182ddd Parents: 4fb9201 Author: nickwallen <[email protected]> Authored: Thu Aug 16 08:26:03 2018 -0400 Committer: nickallen <[email protected]> Committed: Thu Aug 16 08:26:03 2018 -0400 ---------------------------------------------------------------------- .../profiler/DefaultMessageDistributor.java | 6 +- .../metron/profiler/DefaultMessageRouter.java | 34 +++++++++-- .../metron/profiler/MessageDistributor.java | 4 +- .../apache/metron/profiler/MessageRoute.java | 64 +++++++++++++++++++- .../metron/profiler/StandAloneProfiler.java | 26 +++----- .../profiler/DefaultMessageDistributorTest.java | 32 +++++----- .../profiler/DefaultMessageRouterTest.java | 55 +++++++++++++++++ .../profiler/bolt/ProfileBuilderBolt.java | 4 +- .../profiler/bolt/ProfileSplitterBolt.java | 37 ++++------- .../profiler/bolt/ProfileBuilderBoltTest.java | 2 +- .../profiler/bolt/ProfileSplitterBoltTest.java | 5 +- 11 files changed, 189 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java index c926a70..d950b07 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java @@ -148,16 +148,14 @@ public class DefaultMessageDistributor implements MessageDistributor, Serializab /** * Distribute a message along a MessageRoute. * - * @param message The message that needs distributed. - * @param timestamp The timestamp of the message. * @param route The message route. * @param context The Stellar execution context. */ @Override - public void distribute(JSONObject message, long timestamp, MessageRoute route, Context context) { + public void distribute(MessageRoute route, Context context) { try { ProfileBuilder builder = getBuilder(route, context); - builder.apply(message, timestamp); + builder.apply(route.getMessage(), route.getTimestamp()); } catch(ExecutionException e) { LOG.error("Unexpected error", e); http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java index 19bfa8c..21ff2b1 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java @@ -22,6 +22,9 @@ package org.apache.metron.profiler; import org.apache.metron.common.configuration.profiler.ProfileConfig; import org.apache.metron.common.configuration.profiler.ProfilerConfig; +import org.apache.metron.profiler.clock.Clock; +import org.apache.metron.profiler.clock.ClockFactory; +import org.apache.metron.profiler.clock.DefaultClockFactory; import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor; import org.apache.metron.stellar.common.StellarStatefulExecutor; import org.apache.metron.stellar.dsl.Context; @@ -54,10 +57,16 @@ public class DefaultMessageRouter implements MessageRouter, Serializable { */ private StellarStatefulExecutor executor; + /** + * Responsible for creating the {@link Clock}. + */ + private ClockFactory clockFactory; + public DefaultMessageRouter(Context context) { this.executor = new DefaultStellarStatefulExecutor(); StellarFunctions.initialize(context); executor.setContext(context); + clockFactory = new DefaultClockFactory(); } /** @@ -74,7 +83,8 @@ public class DefaultMessageRouter implements MessageRouter, Serializable { // attempt to route the message to each of the profiles for (ProfileConfig profile: config.getProfiles()) { - Optional<MessageRoute> route = routeToProfile(message, profile); + Clock clock = clockFactory.createClock(config); + Optional<MessageRoute> route = routeToProfile(message, profile, clock); route.ifPresent(routes::add); } @@ -87,20 +97,24 @@ public class DefaultMessageRouter implements MessageRouter, Serializable { * @param profile The profile that may need the message. * @return A MessageRoute if the message is needed by the profile. */ - private Optional<MessageRoute> routeToProfile(JSONObject message, ProfileConfig profile) { + private Optional<MessageRoute> routeToProfile(JSONObject message, ProfileConfig profile, Clock clock) { Optional<MessageRoute> route = Optional.empty(); // allow the profile to access the fields defined within the message @SuppressWarnings("unchecked") final Map<String, Object> state = (Map<String, Object>) message; - try { // is this message needed by this profile? if (executor.execute(profile.getOnlyif(), state, Boolean.class)) { - // what is the name of the entity in this message? - String entity = executor.execute(profile.getForeach(), state, String.class); - route = Optional.of(new MessageRoute(profile, entity)); + // what time is is? could be either system or event time + Optional<Long> timestamp = clock.currentTimeMillis(message); + if(timestamp.isPresent()) { + + // what is the name of the entity in this message? + String entity = executor.execute(profile.getForeach(), state, String.class); + route = Optional.of(new MessageRoute(profile, entity, message, timestamp.get())); + } } } catch(Throwable e) { @@ -111,4 +125,12 @@ public class DefaultMessageRouter implements MessageRouter, Serializable { return route; } + + public void setExecutor(StellarStatefulExecutor executor) { + this.executor = executor; + } + + public void setClockFactory(ClockFactory clockFactory) { + this.clockFactory = clockFactory; + } } http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java index ea5be0f..b164207 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java @@ -43,12 +43,10 @@ public interface MessageDistributor { /** * Distribute a message along a {@link MessageRoute}. * - * @param message The message that needs distributed. - * @param timestamp The timestamp of the message. * @param route The message route. * @param context The Stellar execution context. */ - void distribute(JSONObject message, long timestamp, MessageRoute route, Context context); + void distribute(MessageRoute route, Context context); /** * Flush all active profiles. http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java index 680e4e8..e76b897 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java @@ -20,7 +20,11 @@ package org.apache.metron.profiler; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.metron.common.configuration.profiler.ProfileConfig; +import org.json.simple.JSONObject; import java.io.Serializable; @@ -48,14 +52,26 @@ public class MessageRoute implements Serializable { private String entity; /** + * The message taking this route. + */ + private JSONObject message; + + /** + * The timestamp of the message. + */ + private Long timestamp; + + /** * Create a {@link MessageRoute}. * * @param profileDefinition The profile definition. - * @param entity The entity. + * @param entity The entity. */ - public MessageRoute(ProfileConfig profileDefinition, String entity) { + public MessageRoute(ProfileConfig profileDefinition, String entity, JSONObject message, Long timestamp) { this.entity = entity; this.profileDefinition = profileDefinition; + this.message = message; + this.timestamp = timestamp; } public String getEntity() { @@ -73,4 +89,48 @@ public class MessageRoute implements Serializable { public void setProfileDefinition(ProfileConfig profileDefinition) { this.profileDefinition = profileDefinition; } + + public JSONObject getMessage() { + return message; + } + + public void setMessage(JSONObject message) { + this.message = message; + } + + public Long getTimestamp() { + return timestamp; + } + + public void setTimestamp(Long timestamp) { + this.timestamp = timestamp; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + MessageRoute that = (MessageRoute) o; + return new EqualsBuilder() + .append(profileDefinition, that.profileDefinition) + .append(entity, that.entity) + .append(message, that.message) + .append(timestamp, that.timestamp) + .isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37) + .append(profileDefinition) + .append(entity) + .append(message) + .append(timestamp) + .toHashCode(); + } } http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java index f79efe6..befa296 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java @@ -112,26 +112,14 @@ public class StandAloneProfiler { * @param message The message to apply. */ public void apply(JSONObject message) { - - // what time is it? - Clock clock = clockFactory.createClock(config); - Optional<Long> timestamp = clock.currentTimeMillis(message); - - // can only route the message, if we have a timestamp - if(timestamp.isPresent()) { - - // route the message to the correct profile builders - List<MessageRoute> routes = router.route(message, config, context); - for (MessageRoute route : routes) { - distributor.distribute(message, timestamp.get(), route, context); - } - - routeCount += routes.size(); - messageCount += 1; - - } else { - LOG.warn("No timestamp available for the message. The message will be ignored."); + // route the message to the correct profile builders + List<MessageRoute> routes = router.route(message, config, context); + for (MessageRoute route : routes) { + distributor.distribute(route, context); } + + routeCount += routes.size(); + messageCount += 1; } /** http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java index ea9c5c6..48161e2 100644 --- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java +++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java @@ -123,10 +123,10 @@ public class DefaultMessageDistributorTest { long timestamp = 100; ProfileConfig definition = createDefinition(profileOne); String entity = (String) messageOne.get("ip_src_addr"); - MessageRoute route = new MessageRoute(definition, entity); + MessageRoute route = new MessageRoute(definition, entity, messageOne, timestamp); // distribute one message and flush - distributor.distribute(messageOne, timestamp, route, context); + distributor.distribute(route, context); List<ProfileMeasurement> measurements = distributor.flush(); // expect one measurement coming from one profile @@ -144,12 +144,12 @@ public class DefaultMessageDistributorTest { String entity = (String) messageOne.get("ip_src_addr"); // distribute one message to the first profile - MessageRoute routeOne = new MessageRoute(createDefinition(profileOne), entity); - distributor.distribute(messageOne, timestamp, routeOne, context); + MessageRoute routeOne = new MessageRoute(createDefinition(profileOne), entity, messageOne, timestamp); + distributor.distribute(routeOne, context); // distribute another message to the second profile, but same entity - MessageRoute routeTwo = new MessageRoute(createDefinition(profileTwo), entity); - distributor.distribute(messageOne, timestamp, routeTwo, context); + MessageRoute routeTwo = new MessageRoute(createDefinition(profileTwo), entity, messageOne, timestamp); + distributor.distribute(routeTwo, context); // expect 2 measurements; 1 for each profile List<ProfileMeasurement> measurements = distributor.flush(); @@ -164,13 +164,13 @@ public class DefaultMessageDistributorTest { // distribute one message String entityOne = (String) messageOne.get("ip_src_addr"); - MessageRoute routeOne = new MessageRoute(createDefinition(profileOne), entityOne); - distributor.distribute(messageOne, timestamp, routeOne, context); + MessageRoute routeOne = new MessageRoute(createDefinition(profileOne), entityOne, messageOne, timestamp); + distributor.distribute(routeOne, context); // distribute another message with a different entity String entityTwo = (String) messageTwo.get("ip_src_addr"); - MessageRoute routeTwo = new MessageRoute(createDefinition(profileTwo), entityTwo); - distributor.distribute(messageTwo, timestamp, routeTwo, context); + MessageRoute routeTwo = new MessageRoute(createDefinition(profileTwo), entityTwo, messageTwo, timestamp); + distributor.distribute(routeTwo, context); // expect 2 measurements; 1 for each entity List<ProfileMeasurement> measurements = distributor.flush(); @@ -190,7 +190,7 @@ public class DefaultMessageDistributorTest { // setup ProfileConfig definition = createDefinition(profileOne); String entity = (String) messageOne.get("ip_src_addr"); - MessageRoute route = new MessageRoute(definition, entity); + MessageRoute route = new MessageRoute(definition, entity, messageOne, System.currentTimeMillis()); distributor = new DefaultMessageDistributor( periodDurationMillis, profileTimeToLiveMillis, @@ -198,7 +198,7 @@ public class DefaultMessageDistributorTest { ticker); // distribute one message - distributor.distribute(messageOne, 1000000, route, context); + distributor.distribute(route, context); // advance time to just shy of the profile TTL ticker.advanceTime(profileTimeToLiveMillis - 1000, MILLISECONDS); @@ -220,7 +220,7 @@ public class DefaultMessageDistributorTest { // setup ProfileConfig definition = createDefinition(profileOne); String entity = (String) messageOne.get("ip_src_addr"); - MessageRoute route = new MessageRoute(definition, entity); + MessageRoute route = new MessageRoute(definition, entity, messageOne, System.currentTimeMillis()); distributor = new DefaultMessageDistributor( periodDurationMillis, profileTimeToLiveMillis, @@ -228,7 +228,7 @@ public class DefaultMessageDistributorTest { ticker); // distribute one message - distributor.distribute(messageOne, 100000, route, context); + distributor.distribute(route, context); // advance time to just beyond the period duration ticker.advanceTime(profileTimeToLiveMillis + 1000, MILLISECONDS); @@ -251,7 +251,7 @@ public class DefaultMessageDistributorTest { // setup ProfileConfig definition = createDefinition(profileOne); String entity = (String) messageOne.get("ip_src_addr"); - MessageRoute route = new MessageRoute(definition, entity); + MessageRoute route = new MessageRoute(definition, entity, messageOne, System.currentTimeMillis()); distributor = new DefaultMessageDistributor( periodDurationMillis, profileTimeToLiveMillis, @@ -259,7 +259,7 @@ public class DefaultMessageDistributorTest { ticker); // distribute one message - distributor.distribute(messageOne, 1000000, route, context); + distributor.distribute(route, context); // advance time a couple of hours ticker.advanceTime(2, HOURS); http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageRouterTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageRouterTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageRouterTest.java index 534f155..f583c30 100644 --- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageRouterTest.java +++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageRouterTest.java @@ -57,6 +57,17 @@ public class DefaultMessageRouterTest { /** * { + * "ip_src_addr": "10.0.0.1", + * "value": "22", + * "timestamp": 1531250226659 + * } + */ + @Multiline + private String inputWithTimestamp; + private JSONObject messageWithTimestamp; + + /** + * { * "profiles": [ ] * } */ @@ -175,6 +186,23 @@ public class DefaultMessageRouterTest { @Multiline private String goodAndBad; + /** + * { + * "profiles": [ + * { + * "profile": "profile-one", + * "foreach": "ip_src_addr", + * "init": { "x": "0" }, + * "update": { "x": "x + 1" }, + * "result": "x" + * } + * ], + * "timestampField": "timestamp" + * } + */ + @Multiline + private String profileWithEventTime; + private DefaultMessageRouter router; private Context context; @@ -193,6 +221,7 @@ public class DefaultMessageRouterTest { JSONParser parser = new JSONParser(); this.messageOne = (JSONObject) parser.parse(inputOne); this.messageTwo = (JSONObject) parser.parse(inputTwo); + this.messageWithTimestamp = (JSONObject) parser.parse(inputWithTimestamp); } @Test @@ -268,4 +297,30 @@ public class DefaultMessageRouterTest { assertEquals("good-profile", route1.getProfileDefinition().getProfile()); assertEquals(messageOne.get("ip_src_addr"), route1.getEntity()); } + + /** + * + */ + @Test + public void testMessageWithTimestamp() throws Exception { + List<MessageRoute> routes = router.route(messageWithTimestamp, createConfig(profileWithEventTime), context);; + + assertEquals(1, routes.size()); + MessageRoute route1 = routes.get(0); + assertEquals("profile-one", route1.getProfileDefinition().getProfile()); + assertEquals(messageWithTimestamp.get("ip_src_addr"), route1.getEntity()); + assertEquals(messageWithTimestamp.get("timestamp"), route1.getTimestamp()); + } + + /** + * If the timestamp of a message cannot be determined, it should not be routed. + * + * <p>This might happen when using event time and the message is missing the timestamp field. + */ + @Test + public void testMessageWithMissingTimestamp() throws Exception { + // messageOne does not contain a timestamp + List<MessageRoute> routes = router.route(messageOne, createConfig(profileWithEventTime), context); + assertEquals(0, routes.size()); + } } http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java index 0d1f27e..f9c0edd 100644 --- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java +++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java @@ -363,9 +363,9 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable { activeFlushSignal.update(timestamp); // distribute the message - MessageRoute route = new MessageRoute(definition, entity); + MessageRoute route = new MessageRoute(definition, entity, message, timestamp); synchronized (messageDistributor) { - messageDistributor.distribute(message, timestamp, route, getStellarContext()); + messageDistributor.distribute(route, getStellarContext()); } LOG.debug("Message distributed: profile={}, entity={}, timestamp={}", definition.getProfile(), entity, timestamp); http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java index f28411f..f57deb7 100644 --- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java +++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java @@ -97,11 +97,6 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt { private transient MessageRouter router; /** - * Responsible for creating the {@link Clock}. - */ - private transient ClockFactory clockFactory; - - /** * @param zookeeperUrl The Zookeeper URL that contains the configuration for this bolt. */ public ProfileSplitterBolt(String zookeeperUrl) { @@ -114,10 +109,9 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt { this.collector = collector; this.parser = new JSONParser(); this.router = new DefaultMessageRouter(getStellarContext()); - this.clockFactory = new DefaultClockFactory(); } - private Context getStellarContext() { + public Context getStellarContext() { Map<String, Object> global = getConfigurations().getGlobalConfig(); return new Context.Builder() .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client) @@ -162,15 +156,9 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt { // ensure there is a valid profiler configuration ProfilerConfig config = getProfilerConfig(); if(config != null && config.getProfiles().size() > 0) { + routeMessage(input, message, config); - // what time is it? - Clock clock = clockFactory.createClock(config); - Optional<Long> timestamp = clock.currentTimeMillis(message); - - // route the message. if a message does not contain the timestamp field, it cannot be routed. - timestamp.ifPresent(ts -> routeMessage(input, message, config, ts)); - - } else { + } else if(LOG.isDebugEnabled()) { LOG.debug("No Profiler configuration found. Nothing to do."); } } @@ -180,24 +168,23 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt { * @param input The input tuple on which to anchor. * @param message The telemetry message. * @param config The Profiler configuration. - * @param timestamp The timestamp of the telemetry message. */ - private void routeMessage(Tuple input, JSONObject message, ProfilerConfig config, Long timestamp) { + private void routeMessage(Tuple input, JSONObject message, ProfilerConfig config) { // emit a tuple for each 'route' List<MessageRoute> routes = router.route(message, config, getStellarContext()); for (MessageRoute route : routes) { - Values values = createValues(message, timestamp, route); + Values values = createValues(route); collector.emit(input, values); LOG.debug("Found route for message; profile={}, entity={}, timestamp={}", route.getProfileDefinition().getProfile(), route.getEntity(), - timestamp); + route.getTimestamp()); } - LOG.debug("Found {} route(s) for message with timestamp={}", routes.size(), timestamp); + LOG.debug("Found {} route(s) for message", routes.size()); } /** @@ -222,22 +209,20 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt { /** * Creates the {@link Values} attached to the outgoing tuple. * - * @param message The telemetry message. - * @param timestamp The timestamp of the message. * @param route The route the message must take. * @return */ - private Values createValues(JSONObject message, Long timestamp, MessageRoute route) { + private Values createValues(MessageRoute route) { // the order here must match `declareOutputFields` - return new Values(message, timestamp, route.getEntity(), route.getProfileDefinition()); + return new Values(route.getMessage(), route.getTimestamp(), route.getEntity(), route.getProfileDefinition()); } protected MessageRouter getMessageRouter() { return router; } - public void setClockFactory(ClockFactory clockFactory) { - this.clockFactory = clockFactory; + public void setRouter(MessageRouter router) { + this.router = router; } } http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java index 3d009fb..3132ae6 100644 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java @@ -127,7 +127,7 @@ public class ProfileBuilderBoltTest extends BaseBoltTest { bolt.execute(tupleWindow); // the message should have been extracted from the tuple and passed to the MessageDistributor - verify(distributor).distribute(eq(message1), eq(timestamp1), any(MessageRoute.class), any()); + verify(distributor).distribute(any(MessageRoute.class), any()); } http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java index bf81923..d57e825 100644 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java @@ -23,6 +23,7 @@ package org.apache.metron.profiler.bolt; import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.common.configuration.profiler.ProfileConfig; import org.apache.metron.common.configuration.profiler.ProfilerConfig; +import org.apache.metron.profiler.DefaultMessageRouter; import org.apache.metron.profiler.clock.FixedClockFactory; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.test.bolt.BaseBoltTest; @@ -428,7 +429,9 @@ public class ProfileSplitterBoltTest extends BaseBoltTest { bolt.prepare(new HashMap<>(), topologyContext, outputCollector); // set the clock factory AFTER calling prepare to use the fixed clock factory - bolt.setClockFactory(new FixedClockFactory(timestamp)); + DefaultMessageRouter router = new DefaultMessageRouter(bolt.getStellarContext()); + router.setClockFactory(new FixedClockFactory(timestamp)); + bolt.setRouter(router); return bolt; }
