Repository: metron Updated Branches: refs/heads/master 269b91d01 -> 5bfc08c57
METRON-1792 Simplify Profile Definitions in Integration Tests (nickwallen) closes apache/metron#1211 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/5bfc08c5 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/5bfc08c5 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/5bfc08c5 Branch: refs/heads/master Commit: 5bfc08c57f1129b7d185ac7257197775ed3bdb5e Parents: 269b91d Author: nickwallen <n...@nickallen.org> Authored: Mon Oct 8 18:32:30 2018 -0400 Committer: nickallen <nickal...@apache.org> Committed: Mon Oct 8 18:32:30 2018 -0400 ---------------------------------------------------------------------- .../zookeeper/event-time-test/profiler.json | 19 ---- .../processing-time-test/profiler.json | 11 -- .../zookeeper/profile-with-stats/profiler.json | 12 -- .../integration/ConfigUploadComponent.java | 31 ++++-- .../integration/ProfilerIntegrationTest.java | 109 ++++++++++++++++--- .../ZKConfigurationsCacheIntegrationTest.java | 5 +- .../src/test/resources/profiler/profiler.json | 19 ++++ 7 files changed, 137 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/5bfc08c5/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/event-time-test/profiler.json ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/event-time-test/profiler.json b/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/event-time-test/profiler.json deleted file mode 100644 index 534b7c6..0000000 --- a/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/event-time-test/profiler.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "timestampField": "timestamp", - "profiles": [ - { - "profile": "count-by-ip", - "foreach": "ip_src_addr", - "init": { "count": 0 }, - "update": { "count" : "count + 1" }, - "result": "count" - }, - { - "profile": "total-count", - "foreach": "'total'", - "init": { "count": 0 }, - "update": { "count": "count + 1" }, - "result": "count" - } - ] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/5bfc08c5/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/processing-time-test/profiler.json ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/processing-time-test/profiler.json b/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/processing-time-test/profiler.json deleted file mode 100644 index e75ec0f..0000000 --- a/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/processing-time-test/profiler.json +++ /dev/null @@ -1,11 +0,0 @@ -{ - "profiles": [ - { - "profile": "processing-time-test", - "foreach": "ip_src_addr", - "init": { "counter": "0" }, - "update": { "counter": "counter + 1" }, - "result": "counter" - } - ] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/5bfc08c5/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/profile-with-stats/profiler.json ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/profile-with-stats/profiler.json b/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/profile-with-stats/profiler.json deleted file mode 100644 index 083e73f..0000000 --- a/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/profile-with-stats/profiler.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "profiles": [ - { - "profile": "profile-with-stats", - "foreach": "'global'", - "init": { "stats": "STATS_INIT()" }, - "update": { "stats": "STATS_ADD(stats, 1)" }, - "result": "stats" - } - ], - "timestampField": "timestamp" -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/5bfc08c5/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ConfigUploadComponent.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ConfigUploadComponent.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ConfigUploadComponent.java index 70487a0..eae3c52 100644 --- a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ConfigUploadComponent.java +++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ConfigUploadComponent.java @@ -19,12 +19,15 @@ */ package org.apache.metron.profiler.storm.integration; +import org.apache.commons.lang3.ArrayUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.metron.common.configuration.profiler.ProfilerConfig; import org.apache.metron.integration.InMemoryComponent; import org.apache.metron.integration.UnableToStartException; import org.apache.metron.integration.components.ZKServerComponent; +import java.util.Arrays; import java.util.Properties; import static org.apache.metron.common.configuration.ConfigurationsUtils.getClient; @@ -41,7 +44,8 @@ public class ConfigUploadComponent implements InMemoryComponent { private Properties topologyProperties; private String globalConfiguration; - private String profilerConfiguration; + private String profilerConfigurationPath; + private ProfilerConfig profilerConfig; @Override public void start() throws UnableToStartException { @@ -86,11 +90,17 @@ public class ConfigUploadComponent implements InMemoryComponent { * @param client The zookeeper client. */ private void uploadProfilerConfig(CuratorFramework client) throws Exception { - if (profilerConfiguration != null) { - byte[] globalConfig = readProfilerConfigFromFile(profilerConfiguration); - if (globalConfig.length > 0) { - writeProfilerConfigToZookeeper(readProfilerConfigFromFile(profilerConfiguration), client); - } + byte[] configBytes = null; + + if (profilerConfigurationPath != null) { + configBytes = readProfilerConfigFromFile(profilerConfigurationPath); + + } else if(profilerConfig != null) { + configBytes = profilerConfig.toJSON().getBytes(); + } + + if (ArrayUtils.getLength(configBytes) > 0) { + writeProfilerConfigToZookeeper(configBytes, client); } } @@ -117,8 +127,13 @@ public class ConfigUploadComponent implements InMemoryComponent { return this; } - public ConfigUploadComponent withProfilerConfiguration(String path) { - this.profilerConfiguration = path; + public ConfigUploadComponent withProfilerConfigurationPath(String path) { + this.profilerConfigurationPath = path; + return this; + } + + public ConfigUploadComponent withProfilerConfiguration(ProfilerConfig profilerConfig) { + this.profilerConfig = profilerConfig; return this; } } http://git-wip-us.apache.org/repos/asf/metron/blob/5bfc08c5/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java index 64a1482..f7e75ce 100644 --- a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java +++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java @@ -23,6 +23,7 @@ package org.apache.metron.profiler.storm.integration; import org.adrianwalker.multilinestring.Multiline; import org.apache.commons.io.FileUtils; import org.apache.metron.common.Constants; +import org.apache.metron.common.configuration.profiler.ProfilerConfig; import org.apache.metron.hbase.mock.MockHBaseTableProvider; import org.apache.metron.hbase.mock.MockHTable; import org.apache.metron.integration.BaseIntegrationTest; @@ -90,18 +91,15 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { public static final long startAt = 10; public static final String entity = "10.0.0.1"; - private static final String tableName = "profiler"; private static final String columnFamily = "P"; private static final String inputTopic = Constants.INDEXING_TOPIC; private static final String outputTopic = "profiles"; private static final int saltDivisor = 10; - private static final long periodDurationMillis = TimeUnit.SECONDS.toMillis(20); private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(10); private static final long windowDurationMillis = TimeUnit.SECONDS.toMillis(10); private static final long profileTimeToLiveMillis = TimeUnit.SECONDS.toMillis(20); - private static final long maxRoutesPerBolt = 100000; private static ZKServerComponent zkComponent; @@ -110,11 +108,9 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { private static ConfigUploadComponent configUploadComponent; private static ComponentRunner runner; private static MockHTable profilerTable; - private static String message1; private static String message2; private static String message3; - private StellarStatefulExecutor executor; /** @@ -135,9 +131,25 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { @Multiline private static String kryoSerializers; + /** + * { + * "profiles": [ + * { + * "profile": "processing-time-test", + * "foreach": "ip_src_addr", + * "init": { "counter": "0" }, + * "update": { "counter": "counter + 1" }, + * "result": "counter" + * } + * ] + * } + */ + @Multiline + private static String processingTimeProfile; + @Test public void testProcessingTime() throws Exception { - uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/processing-time-test"); + uploadConfigToZookeeper(ProfilerConfig.fromJSON(processingTimeProfile)); // start the topology and write 3 test messages to kafka fluxComponent.submitTopology(); @@ -146,7 +158,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { kafkaComponent.writeMessages(inputTopic, message3); // retrieve the profile measurement using PROFILE_GET - String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))"; + String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('15', 'MINUTES'))"; List<Integer> measurements = execute(profileGetExpression, List.class); // need to keep checking for measurements until the profiler has flushed one out @@ -178,7 +190,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { @Test public void testProcessingTimeWithTimeToLiveFlush() throws Exception { - uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/processing-time-test"); + uploadConfigToZookeeper(ProfilerConfig.fromJSON(processingTimeProfile)); // start the topology and write 3 test messages to kafka fluxComponent.submitTopology(); @@ -194,7 +206,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { kafkaComponent.writeMessages(inputTopic, message3); // retrieve the profile measurement using PROFILE_GET - String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))"; + String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('15', 'MINUTES'))"; List<Integer> measurements = execute(profileGetExpression, List.class); // need to keep checking for measurements until the profiler has flushed one out @@ -221,9 +233,33 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { assertEquals(3, measurements.get(0).intValue()); } + /** + * { + * "timestampField": "timestamp", + * "profiles": [ + * { + * "profile": "count-by-ip", + * "foreach": "ip_src_addr", + * "init": { "count": 0 }, + * "update": { "count" : "count + 1" }, + * "result": "count" + * }, + * { + * "profile": "total-count", + * "foreach": "'total'", + * "init": { "count": 0 }, + * "update": { "count": "count + 1" }, + * "result": "count" + * } + * ] + * } + */ + @Multiline + private static String eventTimeProfile; + @Test public void testEventTime() throws Exception { - uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/event-time-test"); + uploadConfigToZookeeper(ProfilerConfig.fromJSON(eventTimeProfile)); // start the topology and write test messages to kafka fluxComponent.submitTopology(); @@ -264,6 +300,23 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { } /** + * { + * "profiles": [ + * { + * "profile": "profile-with-stats", + * "foreach": "'global'", + * "init": { "stats": "STATS_INIT()" }, + * "update": { "stats": "STATS_ADD(stats, 1)" }, + * "result": "stats" + * } + * ], + * "timestampField": "timestamp" + * } + */ + @Multiline + private static String profileWithStats; + + /** * The result produced by a Profile has to be serializable within Storm. If the result is not * serializable the topology will crash and burn. * @@ -272,7 +325,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { */ @Test public void testProfileWithStatsObject() throws Exception { - uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/profile-with-stats"); + uploadConfigToZookeeper(ProfilerConfig.fromJSON(profileWithStats)); // start the topology and write test messages to kafka fluxComponent.submitTopology(); @@ -293,9 +346,34 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { assertTrue(results.get(0) instanceof OnlineStatisticsProvider); } + /** + * { + * "profiles": [ + * { + * "profile": "profile-with-triage", + * "foreach": "'global'", + * "update": { + * "stats": "STATS_ADD(stats, 1)" + * }, + * "result": { + * "profile": "stats", + * "triage": { + * "min": "STATS_MIN(stats)", + * "max": "STATS_MAX(stats)", + * "mean": "STATS_MEAN(stats)" + * } + * } + * } + * ], + * "timestampField": "timestamp" + * } + */ + @Multiline + private static String profileWithTriageResult; + @Test public void testProfileWithTriageResult() throws Exception { - uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/triage-result"); + uploadConfigToZookeeper(ProfilerConfig.fromJSON(profileWithTriageResult)); // start the topology and write test messages to kafka fluxComponent.submitTopology(); @@ -466,13 +544,12 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { /** * Uploads config values to Zookeeper. - * @param path The path on the local filesystem to the config values. + * @param profilerConfig The Profiler configuration. * @throws Exception */ - public void uploadConfigToZookeeper(String path) throws Exception { + public void uploadConfigToZookeeper(ProfilerConfig profilerConfig) throws Exception { configUploadComponent - .withGlobalConfiguration(path) - .withProfilerConfiguration(path) + .withProfilerConfiguration(profilerConfig) .update(); } http://git-wip-us.apache.org/repos/asf/metron/blob/5bfc08c5/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java index a7dc248..ce898d3 100644 --- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java @@ -119,7 +119,6 @@ public class ZKConfigurationsCacheIntegrationTest { @Multiline public static String globalConfig; - public static File profilerDir = new File("../../metron-analytics/metron-profiler-storm/src/test/config/zookeeper"); public ConfigurationsCache cache; public ZKServerComponent zkComponent; @@ -154,7 +153,7 @@ public class ZKConfigurationsCacheIntegrationTest { } { //profiler - byte[] config = IOUtils.toByteArray(new FileInputStream(new File(profilerDir, "/event-time-test/profiler.json"))); + byte[] config = IOUtils.toByteArray(new FileInputStream(new File("src/test/resources/profiler/profiler.json"))); ConfigurationsUtils.writeProfilerConfigToZookeeper( config, client); } { @@ -284,7 +283,7 @@ public class ZKConfigurationsCacheIntegrationTest { } //profiler { - File inFile = new File(profilerDir, "/event-time-test/profiler.json"); + File inFile = new File("src/test/resources/profiler/profiler.json"); ProfilerConfig expectedConfig = JSONUtils.INSTANCE.load(inFile, ProfilerConfig.class); ProfilerConfigurations config = cache.get( ProfilerConfigurations.class); assertEventually(() -> Assert.assertEquals(expectedConfig, config.getProfilerConfig())); http://git-wip-us.apache.org/repos/asf/metron/blob/5bfc08c5/metron-platform/metron-common/src/test/resources/profiler/profiler.json ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/resources/profiler/profiler.json b/metron-platform/metron-common/src/test/resources/profiler/profiler.json new file mode 100644 index 0000000..32be68d --- /dev/null +++ b/metron-platform/metron-common/src/test/resources/profiler/profiler.json @@ -0,0 +1,19 @@ +{ + "timestampField": "timestamp", + "profiles": [ + { + "profile": "count-by-ip", + "foreach": "ip_src_addr", + "init": { "count": 0 }, + "update": { "count" : "count + 1" }, + "result": "count" + }, + { + "profile": "total-count", + "foreach": "'total'", + "init": { "count": 0 }, + "update": { "count": "count + 1" }, + "result": "count" + } + ] +}