Repository: metron Updated Branches: refs/heads/feature/METRON-1699-create-batch-profiler 1545978e1 -> fa3be8d32
METRON-1787 Input Time Constraints for Batch Profiler (nickwallen) closes apache/metron#1209 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/fa3be8d3 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/fa3be8d3 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/fa3be8d3 Branch: refs/heads/feature/METRON-1699-create-batch-profiler Commit: fa3be8d32ccadcd11edad046cbd063cec3a20624 Parents: 1545978 Author: nickwallen <n...@nickallen.org> Authored: Wed Sep 26 18:13:30 2018 -0400 Committer: nickallen <nickal...@apache.org> Committed: Wed Sep 26 18:13:30 2018 -0400 ---------------------------------------------------------------------- .../clock/EventTimeOnlyClockFactory.java | 58 ++++++++ .../clock/EventTimeOnlyClockFactoryTest.java | 61 +++++++++ .../metron-profiler-spark/README.md | 19 +++ .../metron/profiler/spark/BatchProfiler.java | 38 +++++- .../profiler/spark/BatchProfilerConfig.java | 6 +- .../metron/profiler/spark/TimestampParser.java | 55 ++++++++ .../spark/function/MessageRouterFunction.java | 106 +++++++++++++-- .../spark/BatchProfilerIntegrationTest.java | 58 +++++++- .../profiler/spark/TimestampParserTest.java | 67 ++++++++++ .../function/MessageRouterFunctionTest.java | 133 +++++++++++++++++-- 10 files changed, 579 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/fa3be8d3/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/EventTimeOnlyClockFactory.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/EventTimeOnlyClockFactory.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/EventTimeOnlyClockFactory.java new file mode 100644 index 0000000..2f9ca7c --- /dev/null +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/EventTimeOnlyClockFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler.clock; + +import org.apache.metron.common.configuration.profiler.ProfilerConfig; + +import java.io.Serializable; + +/** + * Creates a {@link Clock} based on the profiler configuration. This should + * be used in cases where only event time is accceptable. + * + * <p>If the Profiler is configured to use event time, a {@link EventTimeClock} will + * be created. Otherwise, an {@link IllegalStateException} is thrown. + */ +public class EventTimeOnlyClockFactory implements ClockFactory, Serializable { + + /** + * If the Profiler is configured to use event time, a {@link EventTimeClock} is created. + * Otherwise, an {@link IllegalArgumentException} is thrown. + * + * @param config The profiler configuration. + * @return The appropriate Clock based on the profiler configuration. + * @throws IllegalStateException If the profiler configuration is set to system time. + */ + @Override + public Clock createClock(ProfilerConfig config) { + Clock clock; + + boolean isEventTime = config.getTimestampField().isPresent(); + if(isEventTime) { + String timestampField = config.getTimestampField().get(); + clock = new EventTimeClock(timestampField); + + } else { + throw new IllegalStateException("Expected profiler to use event time."); + } + + return clock; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/fa3be8d3/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/EventTimeOnlyClockFactoryTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/EventTimeOnlyClockFactoryTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/EventTimeOnlyClockFactoryTest.java new file mode 100644 index 0000000..f1d4114 --- /dev/null +++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/clock/EventTimeOnlyClockFactoryTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler.clock; + +import org.apache.metron.common.configuration.profiler.ProfilerConfig; +import org.junit.Before; +import org.junit.Test; + +import java.util.Optional; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests the {@link EventTimeOnlyClockFactory}. + */ +public class EventTimeOnlyClockFactoryTest { + + private EventTimeOnlyClockFactory clockFactory; + + @Before + public void setup() { + clockFactory = new EventTimeOnlyClockFactory(); + } + + @Test + public void testCreateEventTimeClock() { + // configure the profiler to use event time + ProfilerConfig config = new ProfilerConfig(); + config.setTimestampField(Optional.of("timestamp")); + + // the factory should return a clock that handles 'event time' + Clock clock = clockFactory.createClock(config); + assertTrue(clock instanceof EventTimeClock); + } + + @Test(expected = IllegalStateException.class) + public void testCreateProcessingTimeClock() { + // the profiler uses processing time by default + ProfilerConfig config = new ProfilerConfig(); + clockFactory.createClock(config); + fail("Expected exception"); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/fa3be8d3/metron-analytics/metron-profiler-spark/README.md ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-spark/README.md b/metron-analytics/metron-profiler-spark/README.md index 99e8c7e..df143f1 100644 --- a/metron-analytics/metron-profiler-spark/README.md +++ b/metron-analytics/metron-profiler-spark/README.md @@ -245,6 +245,8 @@ You can store both settings for the Profiler along with settings for Spark in th |--- |--- | [`profiler.batch.input.path`](#profilerbatchinputpath) | The path to the input data read by the Batch Profiler. | [`profiler.batch.input.format`](#profilerbatchinputformat) | The format of the input data read by the Batch Profiler. +| [`profiler.batch.input.begin`](#profilerbatchinputend) | Only messages with a timestamp after this will be profiled. +| [`profiler.batch.input.end`](#profilerbatchinputbegin) | Only messages with a timestamp before this will be profiled. | [`profiler.period.duration`](#profilerperiodduration) | The duration of each profile period. | [`profiler.period.duration.units`](#profilerperioddurationunits) | The units used to specify the [`profiler.period.duration`](#profilerperiodduration). | [`profiler.hbase.salt.divisor`](#profilerhbasesaltdivisor) | A salt is prepended to the row key to help prevent hot-spotting. @@ -263,6 +265,23 @@ The path to the input data read by the Batch Profiler. The format of the input data read by the Batch Profiler. +### `profiler.batch.input.begin` + +*Default*: undefined; no time constraint + +Only messages with a timestamp equal to or after this will be profiled. The Profiler will only profiles messages with a timestamp in [`profiler.batch.input.begin`, `profiler.batch.input.end`] inclusive. + +By default, no time constraint is defined. The value is expected to follow the [ISO-8601 instant format](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_INSTANT); 2011-12-03T10:15:30Z. + + +### `profiler.batch.input.end` + +*Default*: undefined; no time constraint + +Only messages with a timestamp before or equal to this will be profiled. The Profiler will only profiles messages with a timestamp in [`profiler.batch.input.begin`, `profiler.batch.input.end`] inclusive. + +By default, no time constraint is defined. The value is expected to follow the [ISO-8601 instant format](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html#ISO_INSTANT); 2011-12-03T10:15:30Z. + ### `profiler.period.duration` *Default*: 15 http://git-wip-us.apache.org/repos/asf/metron/blob/fa3be8d3/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java index d75abc3..39f8b3a 100644 --- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java +++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java @@ -34,10 +34,12 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; import java.lang.invoke.MethodHandles; -import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.Properties; +import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_BEGIN; +import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_END; import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_FORMAT; import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_PATH; import static org.apache.spark.sql.functions.sum; @@ -51,6 +53,12 @@ public class BatchProfiler implements Serializable { protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private TimestampParser timestampParser; + + public BatchProfiler() { + this.timestampParser = new TimestampParser(); + } + /** * Execute the Batch Profiler. * @@ -69,7 +77,6 @@ public class BatchProfiler implements Serializable { LOG.debug("Building {} profile(s)", profiles.getProfiles().size()); Map<String, String> globals = Maps.fromProperties(globalProperties); - String inputFormat = TELEMETRY_INPUT_FORMAT.get(profilerProps, String.class); String inputPath = TELEMETRY_INPUT_PATH.get(profilerProps, String.class); LOG.debug("Loading telemetry from '{}'", inputPath); @@ -85,7 +92,7 @@ public class BatchProfiler implements Serializable { // find all routes for each message Dataset<MessageRoute> routes = telemetry - .flatMap(new MessageRouterFunction(profiles, globals), Encoders.bean(MessageRoute.class)); + .flatMap(messageRouterFunction(profilerProps, profiles, globals), Encoders.bean(MessageRoute.class)); LOG.debug("Generated {} message route(s)", routes.cache().count()); // build the profiles @@ -104,4 +111,29 @@ public class BatchProfiler implements Serializable { return count; } + + /** + * Builds the function that performs message routing. + * + * @param profilerProps The profiler configuration properties. + * @param profiles The profile definitions. + * @param globals The Stellar global properties. + * @return A {@link MessageRouterFunction}. + */ + private MessageRouterFunction messageRouterFunction( + Properties profilerProps, + ProfilerConfig profiles, + Map<String, String> globals) { + MessageRouterFunction routerFunction = new MessageRouterFunction(profiles, globals); + + // an optional time constraint to limit how far back to look for telemetry + Optional<Long> beginAt = timestampParser.parse(TELEMETRY_INPUT_BEGIN.get(profilerProps, String.class)); + beginAt.ifPresent(begin -> routerFunction.withBegin(begin)); + + // an optional time constraint to limit the most recent telemetry + Optional<Long> endAt = timestampParser.parse(TELEMETRY_INPUT_END.get(profilerProps, String.class)); + endAt.ifPresent(end -> routerFunction.withEnd(end)); + + return routerFunction; + } } http://git-wip-us.apache.org/repos/asf/metron/blob/fa3be8d3/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java index 054806e..e8cd160 100644 --- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java +++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java @@ -46,7 +46,11 @@ public enum BatchProfilerConfig { TELEMETRY_INPUT_FORMAT("profiler.batch.input.format", "text", String.class), - TELEMETRY_INPUT_PATH("profiler.batch.input.path", "hdfs://localhost:9000/apps/metron/indexing/indexed/*/*", String.class); + TELEMETRY_INPUT_PATH("profiler.batch.input.path", "hdfs://localhost:9000/apps/metron/indexing/indexed/*/*", String.class), + + TELEMETRY_INPUT_BEGIN("profiler.batch.input.begin", "", String.class), + + TELEMETRY_INPUT_END("profiler.batch.input.end", "", String.class); /** * The key for the configuration value. http://git-wip-us.apache.org/repos/asf/metron/blob/fa3be8d3/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/TimestampParser.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/TimestampParser.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/TimestampParser.java new file mode 100644 index 0000000..4be8b3e --- /dev/null +++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/TimestampParser.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler.spark; + +import org.apache.commons.lang3.StringUtils; + +import java.time.Instant; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.util.Optional; + +/** + * Parses an input string and returns a timestamp in epoch milliseconds. + */ +public class TimestampParser { + + /** + * Parses an input string and returns an optional timestamp in epoch milliseconds. + * + * @param inputString The input defining a timestamp. + * @return A timestamp in epoch milliseconds. + */ + public Optional<Long> parse(String inputString) { + Optional<Long> epochMilli = Optional.empty(); + + // a blank is acceptable and treated as undefined + if (StringUtils.isNotBlank(inputString)) { + epochMilli = Optional.of(new DateTimeFormatterBuilder() + .append(DateTimeFormatter.ISO_INSTANT) + .toFormatter() + .parse(inputString, Instant::from) + .toEpochMilli()); + } + + return epochMilli; + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/fa3be8d3/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/MessageRouterFunction.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/MessageRouterFunction.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/MessageRouterFunction.java index cf8029f..31734d0 100644 --- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/MessageRouterFunction.java +++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/MessageRouterFunction.java @@ -23,7 +23,11 @@ import org.apache.metron.common.configuration.profiler.ProfilerConfig; import org.apache.metron.profiler.DefaultMessageRouter; import org.apache.metron.profiler.MessageRoute; import org.apache.metron.profiler.MessageRouter; +import org.apache.metron.profiler.clock.Clock; +import org.apache.metron.profiler.clock.ClockFactory; +import org.apache.metron.profiler.clock.EventTimeOnlyClockFactory; import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.stellar.dsl.StellarFunctions; import org.apache.spark.api.java.function.FlatMapFunction; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; @@ -54,9 +58,27 @@ public class MessageRouterFunction implements FlatMapFunction<String, MessageRou */ private ProfilerConfig profilerConfig; + /** + * A clock that can extract time from the messages received. + */ + private Clock clock; + + /** + * Only messages with a timestamp after this will be routed. + */ + private Long begin; + + /** + * Only messages with a timestamp before this will be routed. + */ + private Long end; + public MessageRouterFunction(ProfilerConfig profilerConfig, Map<String, String> globals) { this.profilerConfig = profilerConfig; this.globals = globals; + this.begin = Long.MIN_VALUE; + this.end = Long.MAX_VALUE; + withClockFactory(new EventTimeOnlyClockFactory()); } /** @@ -70,8 +92,7 @@ public class MessageRouterFunction implements FlatMapFunction<String, MessageRou */ @Override public Iterator<MessageRoute> call(String jsonMessage) throws Exception { - List<MessageRoute> routes; - + List<MessageRoute> routes = Collections.emptyList(); JSONParser parser = new JSONParser(); Context context = TaskUtils.getContext(globals); MessageRouter router = new DefaultMessageRouter(context); @@ -80,20 +101,67 @@ public class MessageRouterFunction implements FlatMapFunction<String, MessageRou Optional<JSONObject> message = toMessage(jsonMessage, parser); if(message.isPresent()) { - // find all routes - routes = router.route(message.get(), profilerConfig, context); - LOG.trace("Found {} route(s) for a message", routes.size()); + // extract the timestamp from the message + Optional<Long> timestampOpt = clock.currentTimeMillis(message.get()); + if (timestampOpt.isPresent()) { + + // timestamp must be in [begin, end] + Long timestamp = timestampOpt.get(); + if(timestamp >= begin && timestamp <= end) { + routes = router.route(message.get(), profilerConfig, context); + LOG.trace("Found {} route(s) for a message", routes.size()); + + } else { + LOG.trace("Ignoring message; timestamp={} not in [{},{}]", timestamp, prettyPrint(begin), prettyPrint(end)); + } + + } else { + LOG.trace("No timestamp in message. Message will be ignored."); + } } else { - // the message is not valid and must be ignored - routes = Collections.emptyList(); - LOG.trace("No route possible. Unable to parse message."); + LOG.trace("Unable to parse message. Message will be ignored"); } return routes.iterator(); } /** + * Set a time constraint. + * + * @param begin Only messages with a timestamp after this will be routed. + * @return The message router function + */ + public MessageRouterFunction withBegin(Long begin) { + this.begin = begin; + return this; + } + + /** + * Set a time constraint. + * + * @param end Only messages with a timestamp before this will be routed. + * @return The message router function + */ + public MessageRouterFunction withEnd(Long end) { + this.end = end; + return this; + } + + /** + * Defines the {@link ClockFactory} used to create the {@link Clock}. + * + * <p>Calling this method is only needed to override the default behavior. + * + * @param clockFactory The factory to use for creating the {@link Clock}. + * @return The message router function. + */ + public MessageRouterFunction withClockFactory(ClockFactory clockFactory) { + this.clock = clockFactory.createClock(profilerConfig); + return this; + } + + /** * Parses the raw JSON of a message. * * @param json The raw JSON to parse. @@ -110,4 +178,26 @@ public class MessageRouterFunction implements FlatMapFunction<String, MessageRou return Optional.empty(); } } + + /** + * Pretty prints a Long value for use when logging these values. + * + * <p>Long.MIN_VALUE and Long.MAX_VALUE will occur frequently and is difficult to grok in the logs. Instead + * Long.MIN_VALUE is rendered as "MIN". Long.MAX_VALUE is srendered as "MAX". All other values are rendered + * directly. + * + * @param value The value to pretty print. + * @return + */ + private static String prettyPrint(Long value) { + String result; + if(value == Long.MIN_VALUE) { + result = "MIN"; + } else if(value == Long.MAX_VALUE) { + result = "MAX"; + } else { + result = value.toString(); + } + return result; + } } http://git-wip-us.apache.org/repos/asf/metron/blob/fa3be8d3/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java index 87c4246..c33644f 100644 --- a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java +++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java @@ -38,8 +38,11 @@ import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import java.lang.invoke.MethodHandles; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -51,6 +54,8 @@ import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PRO import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_COLUMN_FAMILY; import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_NAME; import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_PROVIDER; +import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_BEGIN; +import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_END; import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_FORMAT; import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_PATH; import static org.junit.Assert.assertTrue; @@ -60,6 +65,7 @@ import static org.junit.Assert.assertTrue; */ public class BatchProfilerIntegrationTest { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); /** * { * "timestampField": "timestamp", @@ -212,6 +218,54 @@ public class BatchProfilerIntegrationTest { validateProfiles(); } + @Test + public void testBatchProfilerWithEndTimeConstraint() throws Exception { + // the input telemetry is text/json stored in the local filesystem + profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), "src/test/resources/telemetry.json"); + profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "text"); + + // there are 40 messages before "2018-07-07T15:51:48Z" in the test data + profilerProperties.put(TELEMETRY_INPUT_BEGIN.getKey(), ""); + profilerProperties.put(TELEMETRY_INPUT_END.getKey(), "2018-07-07T15:51:48Z"); + + BatchProfiler profiler = new BatchProfiler(); + profiler.run(spark, profilerProperties, getGlobals(), readerProperties, getProfile()); + + // the max timestamp in the data is around July 7, 2018 + assign("maxTimestamp", "1530978728982L"); + + // the 'window' looks up to 5 hours before the max timestamp + assign("window", "PROFILE_WINDOW('from 5 hours ago', maxTimestamp)"); + + assertTrue(execute("[12] == PROFILE_GET('count-by-ip', '192.168.66.1', window)", Boolean.class)); + assertTrue(execute("[28] == PROFILE_GET('count-by-ip', '192.168.138.158', window)", Boolean.class)); + assertTrue(execute("[40] == PROFILE_GET('total-count', 'total', window)", Boolean.class)); + } + + @Test + public void testBatchProfilerWithBeginTimeConstraint() throws Exception { + // the input telemetry is text/json stored in the local filesystem + profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), "src/test/resources/telemetry.json"); + profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "text"); + + // there are 60 messages after "2018-07-07T15:51:48Z" in the test data + profilerProperties.put(TELEMETRY_INPUT_BEGIN.getKey(), "2018-07-07T15:51:48Z"); + profilerProperties.put(TELEMETRY_INPUT_END.getKey(), ""); + + BatchProfiler profiler = new BatchProfiler(); + profiler.run(spark, profilerProperties, getGlobals(), readerProperties, getProfile()); + + // the max timestamp in the data is around July 7, 2018 + assign("maxTimestamp", "1530978728982L"); + + // the 'window' looks up to 5 hours before the max timestamp + assign("window", "PROFILE_WINDOW('from 5 hours ago', maxTimestamp)"); + + assertTrue(execute("[14] == PROFILE_GET('count-by-ip', '192.168.66.1', window)", Boolean.class)); + assertTrue(execute("[46] == PROFILE_GET('count-by-ip', '192.168.138.158', window)", Boolean.class)); + assertTrue(execute("[60] == PROFILE_GET('total-count', 'total', window)", Boolean.class)); + } + /** * Validates the profiles that were built. * @@ -263,6 +317,8 @@ public class BatchProfilerIntegrationTest { * @return The result of executing the Stellar expression. */ private <T> T execute(String expression, Class<T> clazz) { - return executor.execute(expression, Collections.emptyMap(), clazz); + T results = executor.execute(expression, Collections.emptyMap(), clazz); + LOG.debug("{} = {}", expression, results); + return results; } } http://git-wip-us.apache.org/repos/asf/metron/blob/fa3be8d3/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/TimestampParserTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/TimestampParserTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/TimestampParserTest.java new file mode 100644 index 0000000..f760b35 --- /dev/null +++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/TimestampParserTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler.spark; + +import org.junit.Before; +import org.junit.Test; + +import java.time.format.DateTimeParseException; +import java.util.Optional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TimestampParserTest { + + private TimestampParser parser; + + @Before + public void setup() { + parser = new TimestampParser(); + } + + @Test + public void testEmpty() { + Optional<Long> millis = parser.parse(""); + assertFalse(millis.isPresent()); + } + + @Test + public void testBlank() { + Optional<Long> millis = parser.parse(" "); + assertFalse(millis.isPresent()); + } + + @Test + public void testIsoInstantFormat() { + // ISO-8601 instant format + Optional<Long> millis = parser.parse("2011-12-03T10:15:30Z"); + assertTrue(millis.isPresent()); + assertEquals(1322907330000L, millis.get().longValue()); + } + + @Test(expected = DateTimeParseException.class) + public void testInvalidFormat() { + parser.parse("1537502400000"); + fail("Expected exception"); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/fa3be8d3/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/MessageRouterFunctionTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/MessageRouterFunctionTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/MessageRouterFunctionTest.java index ceaa7cd..9a2cbf4 100644 --- a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/MessageRouterFunctionTest.java +++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/MessageRouterFunctionTest.java @@ -31,14 +31,17 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; /** * Tests the {@link MessageRouterFunction}. */ public class MessageRouterFunctionTest { + private Long messageTimestamp = 1537468508360L; + /** - * { "ip_src_addr": "192.168.1.22" } + * { "ip_src_addr": "192.168.1.22", "timestamp": 1537468508360 } */ @Multiline private String goodMessage; @@ -48,9 +51,15 @@ public class MessageRouterFunctionTest { */ private String badMessage; + /** + * { "ip_src_addr": "192.168.1.22" } + */ + @Multiline + private String messageNoTimestamp; + @Test public void testFindRoutes() throws Exception { - MessageRouterFunction function = new MessageRouterFunction(oneProfile(), getGlobals()); + MessageRouterFunction function = new MessageRouterFunction(profile(), getGlobals()); Iterator<MessageRoute> iter = function.call(goodMessage); List<MessageRoute> routes = Lists.newArrayList(iter); @@ -58,14 +67,20 @@ public class MessageRouterFunctionTest { Assert.assertEquals("profile1", routes.get(0).getProfileDefinition().getProfile()); } - /** - * A bad or invalid message should return no routes. - */ + @Test(expected = IllegalStateException.class) + public void testWithSystemTime() throws Exception { + MessageRouterFunction function = new MessageRouterFunction(profileWithSystemTime(), getGlobals()); + Iterator<MessageRoute> iter = function.call(goodMessage); + + Assert.fail("Exception expected as system time is not supported."); + } + @Test public void testWithBadMessage() throws Exception { - MessageRouterFunction function = new MessageRouterFunction(oneProfile(), getGlobals()); + MessageRouterFunction function = new MessageRouterFunction(profile(), getGlobals()); Iterator<MessageRoute> iter = function.call(badMessage); + // an invalid message should return no routes List<MessageRoute> routes = Lists.newArrayList(iter); Assert.assertEquals(0, routes.size()); } @@ -81,7 +96,88 @@ public class MessageRouterFunctionTest { Assert.assertEquals("profile2", routes.get(1).getProfileDefinition().getProfile()); } - private ProfilerConfig oneProfile() { + @Test + public void testWithNoTimestampInMessage() throws Exception { + MessageRouterFunction function = new MessageRouterFunction(profile(), getGlobals()); + Iterator<MessageRoute> iter = function.call(messageNoTimestamp); + + // with no timestamp, the message should be ignored + List<MessageRoute> routes = Lists.newArrayList(iter); + Assert.assertEquals(0, routes.size()); + } + + @Test + public void testMessageFilteredByBegin() throws Exception { + MessageRouterFunction function = new MessageRouterFunction(profile(), getGlobals()) + .withBegin(messageTimestamp + 1000); + Iterator<MessageRoute> iter = function.call(goodMessage); + + // the message should be filtered because it is before `beginAt` + List<MessageRoute> routes = Lists.newArrayList(iter); + Assert.assertEquals(0, routes.size()); + } + + @Test + public void testMessageNotFilteredByBegin() throws Exception { + MessageRouterFunction function = new MessageRouterFunction(profile(), getGlobals()) + .withBegin(messageTimestamp - 1000); + Iterator<MessageRoute> iter = function.call(goodMessage); + + // the message should NOT be filtered because it is after 'beginAt' + List<MessageRoute> routes = Lists.newArrayList(iter); + Assert.assertEquals(1, routes.size()); + } + + @Test + public void testMessageFilteredByEnd() throws Exception { + MessageRouterFunction function = new MessageRouterFunction(profile(), getGlobals()) + .withEnd(messageTimestamp - 1000); + Iterator<MessageRoute> iter = function.call(goodMessage); + + // the message should be filtered because it is after 'endAt' + List<MessageRoute> routes = Lists.newArrayList(iter); + Assert.assertEquals(0, routes.size()); + } + + @Test + public void testMessageNotFilteredByEnd() throws Exception { + MessageRouterFunction function = new MessageRouterFunction(profile(), getGlobals()) + .withEnd(messageTimestamp + 1000); + Iterator<MessageRoute> iter = function.call(goodMessage); + + // the message should NOT be filtered because it is before 'endAt' + List<MessageRoute> routes = Lists.newArrayList(iter); + Assert.assertEquals(1, routes.size()); + } + + @Test + public void testMessageFilteredByBeginAndEnd() throws Exception { + MessageRouterFunction function = new MessageRouterFunction(profile(), getGlobals()) + .withBegin(messageTimestamp + 1000) + .withEnd(messageTimestamp + 2000); + Iterator<MessageRoute> iter = function.call(goodMessage); + + // the message should be filtered because it is outside of [beginAt, endAt] + List<MessageRoute> routes = Lists.newArrayList(iter); + Assert.assertEquals(0, routes.size()); + } + + @Test + public void testMessageNotFilteredByBeginAndEnd() throws Exception { + MessageRouterFunction function = new MessageRouterFunction(profile(), getGlobals()) + .withBegin(messageTimestamp - 1000) + .withEnd(messageTimestamp + 1000); + Iterator<MessageRoute> iter = function.call(goodMessage); + + // the message should NOT be filtered because it is after 'endAt' + List<MessageRoute> routes = Lists.newArrayList(iter); + Assert.assertEquals(1, routes.size()); + } + + /** + * Creates a profiler definition, using event time, and containing one profile. + */ + private ProfilerConfig profile() { ProfileConfig profile = new ProfileConfig() .withProfile("profile1") .withForeach("ip_src_addr") @@ -89,9 +185,13 @@ public class MessageRouterFunctionTest { .withResult("count"); return new ProfilerConfig() - .withProfile(profile); + .withProfile(profile) + .withTimestampField(Optional.of("timestamp")); } + /** + * Creates a profiler definition, using event time, and containing two profiles. + */ private ProfilerConfig twoProfiles() { ProfileConfig profile1 = new ProfileConfig() .withProfile("profile1") @@ -105,7 +205,22 @@ public class MessageRouterFunctionTest { .withResult("count"); return new ProfilerConfig() .withProfile(profile1) - .withProfile(profile2); + .withProfile(profile2) + .withTimestampField(Optional.of("timestamp")); + } + + /** + * Creates a profiler definition using system time. + */ + private ProfilerConfig profileWithSystemTime() { + ProfileConfig profile = new ProfileConfig() + .withProfile("profile1") + .withForeach("ip_src_addr") + .withUpdate("count", "count + 1") + .withResult("count"); + + return new ProfilerConfig() + .withProfile(profile); } private Map<String, String> getGlobals() {