METRON-1505 Intermittent Profiler Integration Test Failure (nickwallen) closes apache/metron#977
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/46bc63db Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/46bc63db Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/46bc63db Branch: refs/heads/feature/METRON-1090-stellar-assignment Commit: 46bc63dbcfe9f0ddabfd4821958962a2dac9378e Parents: ab4f8e6 Author: nickwallen <n...@nickallen.org> Authored: Sat Apr 7 11:28:01 2018 -0400 Committer: nickallen <nickal...@apache.org> Committed: Sat Apr 7 11:28:01 2018 -0400 ---------------------------------------------------------------------- .../profiler/DefaultMessageDistributor.java | 54 +++- .../src/main/flux/profiler/remote.yaml | 2 + .../profiler/bolt/ProfileBuilderBolt.java | 149 +++++++--- .../profiler/bolt/ProfileSplitterBolt.java | 1 - .../config/zookeeper/percentiles/profiler.json | 12 - .../processing-time-test/profiler.json | 11 + .../zookeeper/readme-example-1/profiler.json | 17 -- .../zookeeper/readme-example-2/profiler.json | 18 -- .../zookeeper/readme-example-3/profiler.json | 11 - .../zookeeper/readme-example-4/profiler.json | 11 - .../profiler/bolt/ProfileBuilderBoltTest.java | 130 +++------ .../integration/ProfilerIntegrationTest.java | 274 +++++-------------- .../configuration/profiler/ProfileConfig.java | 49 ++-- .../ZKConfigurationsCacheIntegrationTest.java | 4 +- .../org/apache/metron/hbase/bolt/HBaseBolt.java | 22 +- 15 files changed, 319 insertions(+), 446 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java index ea5126f..70f4228 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java @@ -25,6 +25,7 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; +import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.metron.common.configuration.profiler.ProfileConfig; import org.apache.metron.stellar.dsl.Context; import org.json.simple.JSONObject; @@ -33,7 +34,6 @@ import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutionException; @@ -74,7 +74,7 @@ public class DefaultMessageDistributor implements MessageDistributor { * messages. Once it has not received messages for a period of time, it is * moved to the expired cache. */ - private transient Cache<String, ProfileBuilder> activeCache; + private transient Cache<Integer, ProfileBuilder> activeCache; /** * A cache of expired profiles. @@ -85,7 +85,7 @@ public class DefaultMessageDistributor implements MessageDistributor { * can flush the state of the expired profile. If the client does not flush * the expired profiles, this state will be lost forever. */ - private transient Cache<String, ProfileBuilder> expiredCache; + private transient Cache<Integer, ProfileBuilder> expiredCache; /** * Create a new message distributor. @@ -222,7 +222,7 @@ public class DefaultMessageDistributor implements MessageDistributor { * @param cache The cache to flush. * @return The measurements captured when flushing the profiles. */ - private List<ProfileMeasurement> flushCache(Cache<String, ProfileBuilder> cache) { + private List<ProfileMeasurement> flushCache(Cache<Integer, ProfileBuilder> cache) { List<ProfileMeasurement> measurements = new ArrayList<>(); for(ProfileBuilder profileBuilder: cache.asMap().values()) { @@ -262,11 +262,19 @@ public class DefaultMessageDistributor implements MessageDistributor { /** * Builds the key that is used to lookup the {@link ProfileBuilder} within the cache. * + * <p>The cache key is built using the hash codes of the profile and entity name. If the profile + * definition is ever changed, the same cache entry will not be reused. This ensures that no + * state can be carried over from the old definition into the new, which might result in an + * invalid profile measurement. + * * @param profile The profile definition. * @param entity The entity. */ - private String cacheKey(ProfileConfig profile, String entity) { - return format("%s:%s", profile, entity); + private int cacheKey(ProfileConfig profile, String entity) { + return new HashCodeBuilder(17, 37) + .append(profile) + .append(entity) + .hashCode(); } public DefaultMessageDistributor withPeriodDurationMillis(long periodDurationMillis) { @@ -281,29 +289,45 @@ public class DefaultMessageDistributor implements MessageDistributor { /** * A listener that is notified when profiles expire from the active cache. */ - private class ActiveCacheRemovalListener implements RemovalListener<String, ProfileBuilder> { + private class ActiveCacheRemovalListener implements RemovalListener<Integer, ProfileBuilder> { @Override - public void onRemoval(RemovalNotification<String, ProfileBuilder> notification) { + public void onRemoval(RemovalNotification<Integer, ProfileBuilder> notification) { - String key = notification.getKey(); ProfileBuilder expired = notification.getValue(); + LOG.warn("Profile expired from active cache; profile={}, entity={}", + expired.getDefinition().getProfile(), + expired.getEntity()); - LOG.warn("Profile expired from active cache; key={}", key); - expiredCache.put(key, expired); + // add the profile to the expired cache + expiredCache.put(notification.getKey(), expired); } } /** * A listener that is notified when profiles expire from the active cache. */ - private class ExpiredCacheRemovalListener implements RemovalListener<String, ProfileBuilder> { + private class ExpiredCacheRemovalListener implements RemovalListener<Integer, ProfileBuilder> { @Override - public void onRemoval(RemovalNotification<String, ProfileBuilder> notification) { + public void onRemoval(RemovalNotification<Integer, ProfileBuilder> notification) { + + if(notification.wasEvicted()) { + + // the expired profile was NOT flushed in time + ProfileBuilder expired = notification.getValue(); + LOG.warn("Expired profile NOT flushed before removal, some state lost; profile={}, entity={}", + expired.getDefinition().getProfile(), + expired.getEntity()); - String key = notification.getKey(); - LOG.debug("Profile removed from expired cache; key={}", key); + } else { + + // the expired profile was flushed successfully + ProfileBuilder expired = notification.getValue(); + LOG.debug("Expired profile successfully flushed; profile={}, entity={}", + expired.getDefinition().getProfile(), + expired.getEntity()); + } } } } http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml index 83c9fde..6ad007b 100644 --- a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml +++ b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml @@ -160,6 +160,8 @@ bolts: args: [ref: "windowLag"] - name: "withMaxNumberOfRoutes" args: [${profiler.max.routes.per.bolt}] + - name: "withTimestampField" + args: ["timestamp"] - id: "hbaseBolt" className: "org.apache.metron.hbase.bolt.HBaseBolt" http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java index ffe823f..fb3d2d0 100644 --- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java +++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java @@ -42,13 +42,13 @@ import org.apache.metron.stellar.common.utils.ConversionUtils; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.zookeeper.SimpleEventListener; import org.apache.metron.zookeeper.ZKCache; -import org.apache.storm.Config; +import org.apache.storm.StormTimer; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseWindowedBolt; import org.apache.storm.tuple.Tuple; -import org.apache.storm.utils.TupleUtils; +import org.apache.storm.utils.Utils; import org.apache.storm.windowing.TupleWindow; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; @@ -60,6 +60,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; import static java.lang.String.format; import static org.apache.metron.profiler.bolt.ProfileSplitterBolt.ENTITY_TUPLE_FIELD; @@ -127,6 +129,9 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable { /** * Distributes messages to the profile builders. + * + * <p>Since expired profiles are flushed on a separate thread, all access to this + * {@code MessageDistributor} needs to be protected. */ private MessageDistributor messageDistributor; @@ -145,9 +150,21 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable { private List<ProfileMeasurementEmitter> emitters; /** - * Signals when it is time to flush. + * Signals when it is time to flush the active profiles. + */ + private FlushSignal activeFlushSignal; + + /** + * A timer that flushes expired profiles on a regular interval. The expired profiles + * are flushed on a separate thread. + * + * <p>Flushing expired profiles ensures that any profiles that stop receiving messages + * for an extended period of time will continue to be flushed. + * + * <p>This introduces concurrency issues as the bolt is no longer single threaded. Due + * to this, all access to the {@code MessageDistributor} needs to be protected. */ - private FlushSignal flushSignal; + private StormTimer expiredFlushTimer; public ProfileBuilderBolt() { this.emitters = new ArrayList<>(); @@ -183,16 +200,26 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable { this.parser = new JSONParser(); this.messageDistributor = new DefaultMessageDistributor(periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes); this.configurations = new ProfilerConfigurations(); - this.flushSignal = new FixedFrequencyFlushSignal(periodDurationMillis); + this.activeFlushSignal = new FixedFrequencyFlushSignal(periodDurationMillis); setupZookeeper(); + startExpiredFlushTimer(); } @Override public void cleanup() { - zookeeperCache.close(); - zookeeperClient.close(); + try { + zookeeperCache.close(); + zookeeperClient.close(); + expiredFlushTimer.close(); + + } catch(Throwable e) { + LOG.error("Exception when cleaning up", e); + } } + /** + * Setup connectivity to Zookeeper which provides the necessary configuration for the bolt. + */ private void setupZookeeper() { try { if (zookeeperClient == null) { @@ -248,18 +275,6 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable { emitters.forEach(emitter -> emitter.declareOutputFields(declarer)); } - /** - * Defines the frequency at which the bolt will receive tick tuples. Tick tuples are - * used to control how often a profile is flushed. - */ - @Override - public Map<String, Object> getComponentConfiguration() { - - Map<String, Object> conf = super.getComponentConfiguration(); - conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, TimeUnit.MILLISECONDS.toSeconds(profileTimeToLiveMillis)); - return conf; - } - private Context getStellarContext() { Map<String, Object> global = getConfigurations().getGlobalConfig(); @@ -282,24 +297,12 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable { // handle each tuple in the window for(Tuple tuple : window.get()) { - - if(TupleUtils.isTick(tuple)) { - handleTick(); - - } else { - handleMessage(tuple); - } + handleMessage(tuple); } - // time to flush? - if(flushSignal.isTimeToFlush()) { - flushSignal.reset(); - - // flush the active profiles - List<ProfileMeasurement> measurements = messageDistributor.flush(); - emitMeasurements(measurements); - - LOG.debug("Flushed active profiles and found {} measurement(s).", measurements.size()); + // time to flush active profiles? + if(activeFlushSignal.isTimeToFlush()) { + flushActive(); } } catch (Throwable e) { @@ -310,17 +313,37 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable { } /** - * Flush all expired profiles when a 'tick' is received. + * Flush all active profiles. + */ + protected void flushActive() { + activeFlushSignal.reset(); + + // flush the active profiles + List<ProfileMeasurement> measurements; + synchronized(messageDistributor) { + measurements = messageDistributor.flush(); + emitMeasurements(measurements); + } + + LOG.debug("Flushed active profiles and found {} measurement(s).", measurements.size()); + + } + + /** + * Flushes all expired profiles. * - * If a profile has not received a message for an extended period of time then it is + * <p>If a profile has not received a message for an extended period of time then it is * marked as expired. Periodically we need to flush these expired profiles to ensure * that their state is not lost. */ - private void handleTick() { + protected void flushExpired() { // flush the expired profiles - List<ProfileMeasurement> measurements = messageDistributor.flushExpired(); - emitMeasurements(measurements); + List<ProfileMeasurement> measurements; + synchronized (messageDistributor) { + measurements = messageDistributor.flushExpired(); + emitMeasurements(measurements); + } LOG.debug("Flushed expired profiles and found {} measurement(s).", measurements.size()); } @@ -339,11 +362,13 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable { Long timestamp = getField(TIMESTAMP_TUPLE_FIELD, input, Long.class); // keep track of time - flushSignal.update(timestamp); + activeFlushSignal.update(timestamp); // distribute the message MessageRoute route = new MessageRoute(definition, entity); - messageDistributor.distribute(message, timestamp, route, getStellarContext()); + synchronized (messageDistributor) { + messageDistributor.distribute(message, timestamp, route, getStellarContext()); + } LOG.debug("Message distributed: profile={}, entity={}, timestamp={}", definition.getProfile(), entity, timestamp); } @@ -395,10 +420,46 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable { return value; } + /** + * Converts milliseconds to seconds and handles an ugly cast. + * + * @param millis Duration in milliseconds. + * @return Duration in seconds. + */ + private int toSeconds(long millis) { + return (int) TimeUnit.MILLISECONDS.toSeconds(millis); + } + + /** + * Creates a timer that regularly flushes expired profiles on a separate thread. + */ + private void startExpiredFlushTimer() { + + expiredFlushTimer = createTimer("flush-expired-profiles-timer"); + expiredFlushTimer.scheduleRecurring(0, toSeconds(profileTimeToLiveMillis), () -> flushExpired()); + } + + /** + * Creates a timer that can execute a task on a fixed interval. + * + * <p>If the timer encounters an exception, the entire process will be killed. + * + * @param name The name of the timer. + * @return The timer. + */ + private StormTimer createTimer(String name) { + + return new StormTimer(name, (thread, exception) -> { + String msg = String.format("Unexpected exception in timer task; timer=%s", name); + LOG.error(msg, exception); + Utils.exitProcess(1, msg); + }); + } + @Override public BaseWindowedBolt withTumblingWindow(BaseWindowedBolt.Duration duration) { - // need to capture the window duration for setting the flush count down + // need to capture the window duration to validate it along with other profiler settings this.windowDurationMillis = duration.value; return super.withTumblingWindow(duration); } @@ -464,7 +525,7 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable { } public ProfileBuilderBolt withFlushSignal(FlushSignal flushSignal) { - this.flushSignal = flushSignal; + this.activeFlushSignal = flushSignal; return this; } http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java index 4e62eee..a92a432 100644 --- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java +++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java @@ -21,7 +21,6 @@ package org.apache.metron.profiler.bolt; import org.apache.metron.common.bolt.ConfiguredProfilerBolt; -import org.apache.metron.common.configuration.profiler.ProfileConfig; import org.apache.metron.common.configuration.profiler.ProfilerConfig; import org.apache.metron.profiler.DefaultMessageRouter; import org.apache.metron.profiler.MessageRoute; http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/test/config/zookeeper/percentiles/profiler.json ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/percentiles/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/percentiles/profiler.json deleted file mode 100644 index 8a54834..0000000 --- a/metron-analytics/metron-profiler/src/test/config/zookeeper/percentiles/profiler.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "profiles": [ - { - "profile": "percentiles", - "foreach": "ip_src_addr", - "onlyif": "protocol == 'HTTP'", - "init": { "s": "STATS_INIT(100)" }, - "update": { "s": "STATS_ADD(s, length)" }, - "result": "STATS_PERCENTILE(s, 0.7)" - } - ] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/test/config/zookeeper/processing-time-test/profiler.json ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/processing-time-test/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/processing-time-test/profiler.json new file mode 100644 index 0000000..e75ec0f --- /dev/null +++ b/metron-analytics/metron-profiler/src/test/config/zookeeper/processing-time-test/profiler.json @@ -0,0 +1,11 @@ +{ + "profiles": [ + { + "profile": "processing-time-test", + "foreach": "ip_src_addr", + "init": { "counter": "0" }, + "update": { "counter": "counter + 1" }, + "result": "counter" + } + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-1/profiler.json ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-1/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-1/profiler.json deleted file mode 100644 index 96c60a1..0000000 --- a/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-1/profiler.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "profiles": [ - { - "profile": "example1", - "foreach": "ip_src_addr", - "onlyif": "protocol == 'HTTP'", - "init": { - "total_bytes": 0.0 - }, - "update": { - "total_bytes": "total_bytes + bytes_in" - }, - "result": "total_bytes", - "expires": 30 - } - ] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-2/profiler.json ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-2/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-2/profiler.json deleted file mode 100644 index e5d8f39..0000000 --- a/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-2/profiler.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "profiles": [ - { - "profile": "example2", - "foreach": "ip_src_addr", - "onlyif": "protocol == 'DNS' or protocol == 'HTTP'", - "init": { - "num_dns": 1.0, - "num_http": 1.0 - }, - "update": { - "num_dns": "num_dns + (if protocol == 'DNS' then 1 else 0)", - "num_http": "num_http + (if protocol == 'HTTP' then 1 else 0)" - }, - "result": "num_dns / num_http" - } - ] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-3/profiler.json ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-3/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-3/profiler.json deleted file mode 100644 index 67cdefd..0000000 --- a/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-3/profiler.json +++ /dev/null @@ -1,11 +0,0 @@ -{ - "profiles": [ - { - "profile": "example3", - "foreach": "ip_src_addr", - "onlyif": "protocol == 'HTTP'", - "update": { "s": "STATS_ADD(s, length)" }, - "result": "STATS_MEAN(s)" - } - ] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-4/profiler.json ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-4/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-4/profiler.json deleted file mode 100644 index b003ce0..0000000 --- a/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-4/profiler.json +++ /dev/null @@ -1,11 +0,0 @@ -{ - "profiles": [ - { - "profile": "example4", - "foreach": "ip_src_addr", - "onlyif": "protocol == 'HTTP'", - "update": { "s": "STATS_ADD(s, length)" }, - "result": "s" - } - ] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java index 78e20e0..3d009fb 100644 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java @@ -48,7 +48,6 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -67,6 +66,7 @@ public class ProfileBuilderBoltTest extends BaseBoltTest { private ProfileConfig profile2; private ProfileMeasurementEmitter emitter; private ManualFlushSignal flushSignal; + private ProfileMeasurement measurement; @Before public void setup() throws Exception { @@ -95,6 +95,12 @@ public class ProfileBuilderBoltTest extends BaseBoltTest { .withUpdate(Collections.singletonMap("x", "x + 1")) .withResult("x"); + measurement = new ProfileMeasurement() + .withEntity("entity1") + .withProfileName("profile1") + .withPeriod(1000, 500, TimeUnit.MILLISECONDS) + .withProfileValue(22); + flushSignal = new ManualFlushSignal(); flushSignal.setFlushNow(false); } @@ -127,23 +133,16 @@ public class ProfileBuilderBoltTest extends BaseBoltTest { /** * If the {@code FlushSignal} tells the bolt to flush, it should flush the {@code MessageDistributor} - * and emit the {@code ProfileMeasurement} values. + * and emit the {@code ProfileMeasurement} values from all active profiles. */ @Test - public void testEmitWhenFlush() throws Exception { + public void testFlushActiveProfiles() throws Exception { ProfileBuilderBolt bolt = createBolt(); - // create a profile measurement - ProfileMeasurement m = new ProfileMeasurement() - .withEntity("entity1") - .withProfileName("profile1") - .withPeriod(1000, 500, TimeUnit.MILLISECONDS) - .withProfileValue(22); - // create a mock that returns the profile measurement above MessageDistributor distributor = mock(MessageDistributor.class); - when(distributor.flush()).thenReturn(Collections.singletonList(m)); + when(distributor.flush()).thenReturn(Collections.singletonList(measurement)); bolt.withMessageDistributor(distributor); // signal the bolt to flush @@ -157,30 +156,23 @@ public class ProfileBuilderBoltTest extends BaseBoltTest { // a profile measurement should be emitted by the bolt List<ProfileMeasurement> measurements = getProfileMeasurements(outputCollector, 1); assertEquals(1, measurements.size()); - assertEquals(m, measurements.get(0)); + assertEquals(measurement, measurements.get(0)); } /** * If the {@code FlushSignal} tells the bolt NOT to flush, nothing should be emitted. */ @Test - public void testDoNotEmitWhenNoFlush() throws Exception { + public void testDoNotFlushActiveProfiles() throws Exception { ProfileBuilderBolt bolt = createBolt(); - // create a profile measurement - ProfileMeasurement m = new ProfileMeasurement() - .withEntity("entity1") - .withProfileName("profile1") - .withPeriod(1000, 500, TimeUnit.MILLISECONDS) - .withProfileValue(22); - - // create a mock that returns the profile measurement above + // create a mock where flush() returns the profile measurement above MessageDistributor distributor = mock(MessageDistributor.class); - when(distributor.flush()).thenReturn(Collections.singletonList(m)); + when(distributor.flush()).thenReturn(Collections.singletonList(measurement)); bolt.withMessageDistributor(distributor); - // no flush signal + // there is no flush signal flushSignal.setFlushNow(false); // execute the bolt @@ -193,6 +185,29 @@ public class ProfileBuilderBoltTest extends BaseBoltTest { } /** + * Expired profiles should be flushed regularly, even if no input telemetry + * has been received. + */ + @Test + public void testFlushExpiredProfiles() throws Exception { + + ProfileBuilderBolt bolt = createBolt(); + + // create a mock where flushExpired() returns the profile measurement above + MessageDistributor distributor = mock(MessageDistributor.class); + when(distributor.flushExpired()).thenReturn(Collections.singletonList(measurement)); + bolt.withMessageDistributor(distributor); + + // execute test by flushing expired profiles. this is normally triggered by a timer task. + bolt.flushExpired(); + + // a profile measurement should be emitted by the bolt + List<ProfileMeasurement> measurements = getProfileMeasurements(outputCollector, 1); + assertEquals(1, measurements.size()); + assertEquals(measurement, measurements.get(0)); + } + + /** * A {@link ProfileMeasurement} is built for each profile/entity pair. The measurement should be emitted to each * destination defined by the profile. By default, a profile uses both Kafka and HBase as destinations. */ @@ -232,61 +247,6 @@ public class ProfileBuilderBoltTest extends BaseBoltTest { verify(outputCollector, times(1)).emit(eq("destination3"), any()); } - @Test - public void testFlushExpiredWithTick() throws Exception { - - ProfileBuilderBolt bolt = createBolt(); - - // create a mock - MessageDistributor distributor = mock(MessageDistributor.class); - bolt.withMessageDistributor(distributor); - - // tell the bolt to flush on the first window - flushSignal.setFlushNow(true); - - // execute the bolt; include a tick tuple in the window - Tuple tuple1 = createTuple("entity", message1, profile1, 100000000L); - TupleWindow tupleWindow = createWindow(tuple1, mockTickTuple()); - bolt.execute(tupleWindow); - - // ensure the expired profiles were flushed when the tick tuple was received - verify(distributor).flushExpired(); - } - - @Test - public void testFlushExpiredWithNoTick() throws Exception { - - ProfileBuilderBolt bolt = createBolt(); - - // create a mock - MessageDistributor distributor = mock(MessageDistributor.class); - bolt.withMessageDistributor(distributor); - - // tell the bolt to flush on the first window - flushSignal.setFlushNow(true); - - // execute the bolt; NO tick tuple - Tuple tuple1 = createTuple("entity", message1, profile1, 100000000L); - TupleWindow tupleWindow = createWindow(tuple1); - bolt.execute(tupleWindow); - - // there was no tick tuple; the expired profiles should NOT have been flushed - verify(distributor, times(0)).flushExpired(); - } - - /** - * Creates a mock tick tuple to use for testing. - * @return A mock tick tuple. - */ - private Tuple mockTickTuple() { - - Tuple tuple = mock(Tuple.class); - when(tuple.getSourceComponent()).thenReturn("__system"); - when(tuple.getSourceStreamId()).thenReturn("__tick"); - - return tuple; - } - /** * Retrieves the ProfileMeasurement(s) (if any) that have been emitted. * @@ -334,18 +294,6 @@ public class ProfileBuilderBoltTest extends BaseBoltTest { */ private ProfileBuilderBolt createBolt() throws IOException { - return createBolt(30, TimeUnit.SECONDS); - } - - /** - * Create a ProfileBuilderBolt to test. - * - * @param windowDuration The event window duration. - * @param windowDurationUnits The units of the event window duration. - * @return A {@link ProfileBuilderBolt} to test. - */ - private ProfileBuilderBolt createBolt(int windowDuration, TimeUnit windowDurationUnits) throws IOException { - // defines the zk configurations accessible from the bolt ProfilerConfigurations configurations = new ProfilerConfigurations(); configurations.updateGlobalConfig(Collections.emptyMap()); @@ -359,7 +307,7 @@ public class ProfileBuilderBoltTest extends BaseBoltTest { .withEmitter(emitter) .withProfilerConfigurations(configurations) .withPeriodDuration(1, TimeUnit.MINUTES) - .withTumblingWindow(new BaseWindowedBolt.Duration(windowDuration, windowDurationUnits)); + .withTumblingWindow(new BaseWindowedBolt.Duration(30, TimeUnit.SECONDS)); bolt.prepare(new HashMap<>(), topologyContext, outputCollector); // set the flush signal AFTER calling 'prepare' http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java index c48a3e9..8f5ced3 100644 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java @@ -20,9 +20,6 @@ package org.apache.metron.profiler.integration; -import com.google.common.base.Joiner; -import org.adrianwalker.multilinestring.Multiline; -import org.apache.commons.math.util.MathUtils; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; @@ -41,10 +38,8 @@ import org.apache.metron.profiler.hbase.ColumnBuilder; import org.apache.metron.profiler.hbase.RowKeyBuilder; import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder; import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder; -import org.apache.metron.statistics.OnlineStatisticsProvider; import org.junit.After; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -61,6 +56,7 @@ import static com.google.code.tempusfugit.temporal.Timeout.timeout; import static com.google.code.tempusfugit.temporal.WaitFor.waitOrTimeout; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * An integration test of the Profiler topology. @@ -70,247 +66,103 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { private static final String TEST_RESOURCES = "../../metron-analytics/metron-profiler/src/test"; private static final String FLUX_PATH = "../metron-profiler/src/main/flux/profiler/remote.yaml"; - /** - * { - * "ip_src_addr": "10.0.0.1", - * "protocol": "HTTPS", - * "length": 10, - * "bytes_in": 234 - * } - */ - @Multiline - private static String message1; - - /** - * { - * "ip_src_addr": "10.0.0.2", - * "protocol": "HTTP", - * "length": 20, - * "bytes_in": 390 - * } - */ - @Multiline - private static String message2; - - /** - * { - * "ip_src_addr": "10.0.0.3", - * "protocol": "DNS", - * "length": 30, - * "bytes_in": 560 - * } - */ - @Multiline - private static String message3; - - private static ColumnBuilder columnBuilder; - private static ZKServerComponent zkComponent; - private static FluxTopologyComponent fluxComponent; - private static KafkaComponent kafkaComponent; - private static ConfigUploadComponent configUploadComponent; - private static ComponentRunner runner; - private static MockHTable profilerTable; + public static final long startAt = 10; + public static final String entity = "10.0.0.1"; private static final String tableName = "profiler"; private static final String columnFamily = "P"; - private static final double epsilon = 0.001; private static final String inputTopic = Constants.INDEXING_TOPIC; private static final String outputTopic = "profiles"; private static final int saltDivisor = 10; - private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(5); + private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(1); private static final long windowDurationMillis = TimeUnit.SECONDS.toMillis(5); - private static final long periodDurationMillis = TimeUnit.SECONDS.toMillis(15); - private static final long profileTimeToLiveMillis = TimeUnit.SECONDS.toMillis(20); + private static final long periodDurationMillis = TimeUnit.SECONDS.toMillis(10); + private static final long profileTimeToLiveMillis = TimeUnit.SECONDS.toMillis(15); private static final long maxRoutesPerBolt = 100000; - /** - * Tests the first example contained within the README. - */ - @Test - public void testExample1() throws Exception { - - uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-1"); - - // start the topology and write test messages to kafka - fluxComponent.submitTopology(); - kafkaComponent.writeMessages(inputTopic, message1, message1, message1); - kafkaComponent.writeMessages(inputTopic, message2, message2, message2); - kafkaComponent.writeMessages(inputTopic, message3, message3, message3); - - // verify - ensure the profile is being persisted - waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, - timeout(seconds(180))); - - // verify - only 10.0.0.2 sends 'HTTP', thus there should be only 1 value - List<Double> actuals = read(profilerTable.getPutLog(), columnFamily, - columnBuilder.getColumnQualifier("value"), Double.class); - - // verify - there are 3 'HTTP' each with 390 bytes - Assert.assertTrue(actuals.stream().anyMatch(val -> - MathUtils.equals(390.0 * 3, val, epsilon) - )); - } - - /** - * Tests the second example contained within the README. - */ - @Test - public void testExample2() throws Exception { - - uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-2"); - - // start the topology and write test messages to kafka - fluxComponent.submitTopology(); - kafkaComponent.writeMessages(inputTopic, message1, message1, message1); - kafkaComponent.writeMessages(inputTopic, message2, message2, message2); - kafkaComponent.writeMessages(inputTopic, message3, message3, message3); - - // expect 2 values written by the profile; one for 10.0.0.2 and another for 10.0.0.3 - final int expected = 2; - - // verify - ensure the profile is being persisted - waitOrTimeout(() -> profilerTable.getPutLog().size() >= expected, - timeout(seconds(90))); - - // verify - expect 2 results as 2 hosts involved; 10.0.0.2 sends 'HTTP' and 10.0.0.3 send 'DNS' - List<Double> actuals = read(profilerTable.getPutLog(), columnFamily, - columnBuilder.getColumnQualifier("value"), Double.class); - - // verify - 10.0.0.3 -> 1/4 - Assert.assertTrue( "Could not find a value near 1/4. Actual values read are are: " + Joiner.on(",").join(actuals), - actuals.stream().anyMatch(val -> MathUtils.equals(val, 1.0/4.0, epsilon) - )); - - // verify - 10.0.0.2 -> 4/1 - Assert.assertTrue("Could not find a value near 4. Actual values read are are: " + Joiner.on(",").join(actuals), - actuals.stream().anyMatch(val -> MathUtils.equals(val, 4.0/1.0, epsilon) - )); - } - - /** - * Tests the third example contained within the README. - */ - @Test - public void testExample3() throws Exception { - - uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-3"); + private static ColumnBuilder columnBuilder; + private static ZKServerComponent zkComponent; + private static FluxTopologyComponent fluxComponent; + private static KafkaComponent kafkaComponent; + private static ConfigUploadComponent configUploadComponent; + private static ComponentRunner runner; + private static MockHTable profilerTable; - // start the topology and write test messages to kafka - fluxComponent.submitTopology(); - kafkaComponent.writeMessages(inputTopic, message1, message1, message1); - kafkaComponent.writeMessages(inputTopic, message2, message2, message2); - kafkaComponent.writeMessages(inputTopic, message3, message3, message3); - - // verify - ensure the profile is being persisted - waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, - timeout(seconds(90))); - - // verify - only 10.0.0.2 sends 'HTTP', thus there should be only 1 value - List<Double> actuals = read(profilerTable.getPutLog(), columnFamily, - columnBuilder.getColumnQualifier("value"), Double.class); - - // verify - there are 5 'HTTP' messages each with a length of 20, thus the average should be 20 - Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals), - actuals.stream().anyMatch(val -> MathUtils.equals(val, 20.0, epsilon) - )); - } + private static String message1; + private static String message2; + private static String message3; /** - * Tests the fourth example contained within the README. + * The Profiler can generate profiles based on processing time. With processing time, + * the Profiler builds profiles based on when the telemetry was processed. + * + * <p>Not defining a 'timestampField' within the Profiler configuration tells the Profiler + * to use processing time. */ @Test - public void testExample4() throws Exception { + public void testProcessingTime() throws Exception { - uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-4"); + // upload the config to zookeeper + uploadConfig(TEST_RESOURCES + "/config/zookeeper/processing-time-test"); // start the topology and write test messages to kafka fluxComponent.submitTopology(); - kafkaComponent.writeMessages(inputTopic, message1, message1, message1); - kafkaComponent.writeMessages(inputTopic, message2, message2, message2); - kafkaComponent.writeMessages(inputTopic, message3, message3, message3); - - // verify - ensure the profile is being persisted - waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, - timeout(seconds(90))); - - // verify - only 10.0.0.2 sends 'HTTP', thus there should be only 1 value - byte[] column = columnBuilder.getColumnQualifier("value"); - List<OnlineStatisticsProvider> actuals = read(profilerTable.getPutLog(), columnFamily, column, OnlineStatisticsProvider.class); - - // verify - there are 5 'HTTP' messages each with a length of 20, thus the average should be 20 - Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals), - actuals.stream().anyMatch(val -> MathUtils.equals(val.getMean(), 20.0, epsilon) - )); - } - @Test - public void testPercentiles() throws Exception { - - uploadConfig(TEST_RESOURCES + "/config/zookeeper/percentiles"); + // the messages that will be applied to the profile + kafkaComponent.writeMessages(inputTopic, message1); + kafkaComponent.writeMessages(inputTopic, message2); + kafkaComponent.writeMessages(inputTopic, message3); - // start the topology and write test messages to kafka - fluxComponent.submitTopology(); - kafkaComponent.writeMessages(inputTopic, message1, message1, message1); - kafkaComponent.writeMessages(inputTopic, message2, message2, message2); - kafkaComponent.writeMessages(inputTopic, message3, message3, message3); + // storm needs at least one message to close its event window + int attempt = 0; + while(profilerTable.getPutLog().size() == 0 && attempt++ < 10) { - // verify - ensure the profile is being persisted - waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, - timeout(seconds(90))); + // sleep, at least beyond the current window + Thread.sleep(windowDurationMillis + windowLagMillis); - List<Double> actuals = read(profilerTable.getPutLog(), columnFamily, - columnBuilder.getColumnQualifier("value"), Double.class); + // send another message to help close the current event window + kafkaComponent.writeMessages(inputTopic, message2); + } - // verify - the 70th percentile of x3, 20s = 20.0 - Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals), - actuals.stream().anyMatch(val -> MathUtils.equals(val, 20.0, epsilon))); + // validate what was flushed + List<Integer> actuals = read( + profilerTable.getPutLog(), + columnFamily, + columnBuilder.getColumnQualifier("value"), + Integer.class); + assertEquals(1, actuals.size()); + assertTrue(actuals.get(0) >= 3); } /** - * The Profiler can optionally perform event time processing. With event time processing, + * The Profiler can generate profiles using event time. With event time processing, * the Profiler uses timestamps contained in the source telemetry. * * <p>Defining a 'timestampField' within the Profiler configuration tells the Profiler * from which field the timestamp should be extracted. */ @Test - public void testEventTimeProcessing() throws Exception { - - // constants used for the test - final long startAt = 10; - final String entity = "10.0.0.1"; - final String profileName = "event-time-test"; - - // create some messages that contain a timestamp - a really old timestamp; close to 1970 - String message1 = new MessageBuilder() - .withField("ip_src_addr", entity) - .withField("timestamp", startAt) - .build() - .toJSONString(); - - String message2 = new MessageBuilder() - .withField("ip_src_addr", entity) - .withField("timestamp", startAt + 100) - .build() - .toJSONString(); + public void testEventTime() throws Exception { + // upload the profiler config to zookeeper uploadConfig(TEST_RESOURCES + "/config/zookeeper/event-time-test"); // start the topology and write test messages to kafka fluxComponent.submitTopology(); - kafkaComponent.writeMessages(inputTopic, message1, message2); + kafkaComponent.writeMessages(inputTopic, message1); + kafkaComponent.writeMessages(inputTopic, message2); + kafkaComponent.writeMessages(inputTopic, message3); - // verify - ensure the profile is being persisted - waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, - timeout(seconds(90))); + // wait until the profile is flushed + waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, timeout(seconds(90))); List<Put> puts = profilerTable.getPutLog(); assertEquals(1, puts.size()); // inspect the row key to ensure the profiler used event time correctly. the timestamp // embedded in the row key should match those in the source telemetry - byte[] expectedRowKey = generateExpectedRowKey(profileName, entity, startAt); + byte[] expectedRowKey = generateExpectedRowKey("event-time-test", entity, startAt); byte[] actualRowKey = puts.get(0).getRow(); String msg = String.format("expected '%s', got '%s'", new String(expectedRowKey, "UTF-8"), @@ -364,6 +216,26 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { @BeforeClass public static void setupBeforeClass() throws UnableToStartException { + + // create some messages that contain a timestamp - a really old timestamp; close to 1970 + message1 = new MessageBuilder() + .withField("ip_src_addr", entity) + .withField("timestamp", startAt) + .build() + .toJSONString(); + + message2 = new MessageBuilder() + .withField("ip_src_addr", entity) + .withField("timestamp", startAt + 100) + .build() + .toJSONString(); + + message3 = new MessageBuilder() + .withField("ip_src_addr", entity) + .withField("timestamp", startAt + (windowDurationMillis * 2)) + .build() + .toJSONString(); + columnBuilder = new ValueOnlyColumnBuilder(columnFamily); // storm topology properties http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java index 6205fbf..f5b46e6 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java @@ -18,6 +18,8 @@ package org.apache.metron.common.configuration.profiler; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.commons.lang.builder.EqualsBuilder; +import org.apache.commons.lang.builder.HashCodeBuilder; import java.io.Serializable; import java.util.ArrayList; @@ -225,32 +227,39 @@ public class ProfileConfig implements Serializable { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } - ProfileConfig that = (ProfileConfig) o; + if (o == null || getClass() != o.getClass()) { + return false; + } - if (profile != null ? !profile.equals(that.profile) : that.profile != null) return false; - if (foreach != null ? !foreach.equals(that.foreach) : that.foreach != null) return false; - if (onlyif != null ? !onlyif.equals(that.onlyif) : that.onlyif != null) return false; - if (init != null ? !init.equals(that.init) : that.init != null) return false; - if (update != null ? !update.equals(that.update) : that.update != null) return false; - if (groupBy != null ? !groupBy.equals(that.groupBy) : that.groupBy != null) return false; - if (result != null ? !result.equals(that.result) : that.result != null) return false; - return expires != null ? expires.equals(that.expires) : that.expires == null; + ProfileConfig that = (ProfileConfig) o; + return new EqualsBuilder() + .append(profile, that.profile) + .append(foreach, that.foreach) + .append(onlyif, that.onlyif) + .append(init, that.init) + .append(update, that.update) + .append(groupBy, that.groupBy) + .append(result, that.result) + .append(expires, that.expires) + .isEquals(); } @Override public int hashCode() { - int result1 = profile != null ? profile.hashCode() : 0; - result1 = 31 * result1 + (foreach != null ? foreach.hashCode() : 0); - result1 = 31 * result1 + (onlyif != null ? onlyif.hashCode() : 0); - result1 = 31 * result1 + (init != null ? init.hashCode() : 0); - result1 = 31 * result1 + (update != null ? update.hashCode() : 0); - result1 = 31 * result1 + (groupBy != null ? groupBy.hashCode() : 0); - result1 = 31 * result1 + (result != null ? result.hashCode() : 0); - result1 = 31 * result1 + (expires != null ? expires.hashCode() : 0); - return result1; + return new HashCodeBuilder(17, 37) + .append(profile) + .append(foreach) + .append(onlyif) + .append(init) + .append(update) + .append(groupBy) + .append(result) + .append(expires) + .toHashCode(); } @Override http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java index ec4a98a..5240d7a 100644 --- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java @@ -154,7 +154,7 @@ public class ZKConfigurationsCacheIntegrationTest { } { //profiler - byte[] config = IOUtils.toByteArray(new FileInputStream(new File(profilerDir, "/readme-example-1/profiler.json"))); + byte[] config = IOUtils.toByteArray(new FileInputStream(new File(profilerDir, "/event-time-test/profiler.json"))); ConfigurationsUtils.writeProfilerConfigToZookeeper( config, client); } { @@ -284,7 +284,7 @@ public class ZKConfigurationsCacheIntegrationTest { } //profiler { - File inFile = new File(profilerDir, "/readme-example-1/profiler.json"); + File inFile = new File(profilerDir, "/event-time-test/profiler.json"); ProfilerConfig expectedConfig = JSONUtils.INSTANCE.load(inFile, ProfilerConfig.class); ProfilerConfigurations config = cache.get( ProfilerConfigurations.class); assertEventually(() -> Assert.assertEquals(expectedConfig, config.getProfilerConfig())); http://git-wip-us.apache.org/repos/asf/metron/blob/46bc63db/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java index d16e2f6..6953b18 100644 --- a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java +++ b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java @@ -24,7 +24,7 @@ import java.lang.invoke.MethodHandles; import java.lang.reflect.InvocationTargetException; import java.util.Map; import java.util.Optional; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Durability; import org.apache.metron.hbase.HTableProvider; @@ -77,6 +77,8 @@ public class HBaseBolt extends BaseRichBolt { /** * The name of the class that should be used as a table provider. + * + * <p>Defaults to 'org.apache.metron.hbase.HTableProvider'. */ protected String tableProviderClazzName = "org.apache.metron.hbase.HTableProvider"; @@ -126,6 +128,8 @@ public class HBaseBolt extends BaseRichBolt { @Override public Map<String, Object> getComponentConfiguration() { + LOG.debug("Tick tuples expected every {} second(s)", flushIntervalSecs); + Config conf = new Config(); conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, flushIntervalSecs); return conf; @@ -136,7 +140,13 @@ public class HBaseBolt extends BaseRichBolt { this.collector = collector; this.batchHelper = new BatchHelper(batchSize, collector); - TableProvider provider = this.tableProvider == null ?getTableProvider(tableProviderClazzName):this.tableProvider; + TableProvider provider; + if(this.tableProvider == null) { + provider = createTableProvider(tableProviderClazzName); + } else { + provider = this.tableProvider; + } + hbaseClient = new HBaseClient(provider, HBaseConfiguration.create(), tableName); } @@ -147,6 +157,8 @@ public class HBaseBolt extends BaseRichBolt { @Override public void execute(Tuple tuple) { + LOG.trace("Received a tuple."); + try { if (batchHelper.shouldHandle(tuple)) { save(tuple); @@ -179,12 +191,15 @@ public class HBaseBolt extends BaseRichBolt { } batchHelper.addBatch(tuple); + LOG.debug("Added mutation to the batch; size={}", batchHelper.getBatchSize()); } /** * Flush all saved operations. */ private void flush() { + LOG.debug("About to flush a batch of {} mutation(s)", batchHelper.getBatchSize()); + this.hbaseClient.mutate(); batchHelper.ack(); } @@ -193,7 +208,8 @@ public class HBaseBolt extends BaseRichBolt { * Creates a TableProvider based on a class name. * @param connectorImpl The class name of a TableProvider */ - private static TableProvider getTableProvider(String connectorImpl) { + private static TableProvider createTableProvider(String connectorImpl) { + LOG.trace("Creating table provider; className={}", connectorImpl); // if class name not defined, use a reasonable default if(StringUtils.isEmpty(connectorImpl) || connectorImpl.charAt(0) == '$') {