This is an automated email from the ASF dual-hosted git repository.
nickallen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/master by this push:
new 0cf78d2 METRON-1974 Batch Profiler Should Handle Errant Profiles
Better (nickwallen) closes apache/metron#1326
0cf78d2 is described below
commit 0cf78d223f24eaf7112bfb7fb52d8778f8a1a2fa
Author: nickwallen <[email protected]>
AuthorDate: Tue Feb 12 12:25:29 2019 -0500
METRON-1974 Batch Profiler Should Handle Errant Profiles Better
(nickwallen) closes apache/metron#1326
---
.../spark/function/GroupByPeriodFunction.java | 50 ++++++++++++++++-
.../spark/function/ProfileBuilderFunction.java | 31 ++++++++---
.../spark/BatchProfilerIntegrationTest.java | 59 +++++++++++++++-----
.../spark/function/GroupByPeriodFunctionTest.java | 63 ++++++++++++++++++++++
.../spark/function/ProfileBuilderFunctionTest.java | 59 ++++++++++++++++----
5 files changed, 229 insertions(+), 33 deletions(-)
diff --git
a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/GroupByPeriodFunction.java
b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/GroupByPeriodFunction.java
index 1b602f4..d06f7e9 100644
---
a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/GroupByPeriodFunction.java
+++
b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/GroupByPeriodFunction.java
@@ -47,6 +47,8 @@ public class GroupByPeriodFunction implements
MapFunction<MessageRoute, String>
*/
private TimeUnit periodDurationUnits;
+ private static final String SEPARATOR = "__";
+
public GroupByPeriodFunction(Properties profilerProperties) {
periodDurationUnits =
TimeUnit.valueOf(PERIOD_DURATION_UNITS.get(profilerProperties, String.class));
periodDuration = PERIOD_DURATION.get(profilerProperties, Integer.class);
@@ -55,6 +57,52 @@ public class GroupByPeriodFunction implements
MapFunction<MessageRoute, String>
@Override
public String call(MessageRoute route) {
ProfilePeriod period = ProfilePeriod.fromTimestamp(route.getTimestamp(),
periodDuration, periodDurationUnits);
- return route.getProfileDefinition().getProfile() + "-" + route.getEntity()
+ "-" + period.getPeriod();
+ return new StringBuilder()
+ .append(route.getProfileDefinition().getProfile())
+ .append(SEPARATOR)
+ .append(route.getEntity())
+ .append(SEPARATOR)
+ .append(period.getPeriod())
+ .toString();
+ }
+
+ /**
+ * @param groupKey The group key used to group {@link MessageRoute}s.
+ * @return The name of the profile.
+ */
+ public static String profileFromKey(String groupKey) {
+ String[] pieces = groupKey.split(SEPARATOR);
+ if(pieces.length == 3) {
+ return pieces[0];
+ } else {
+ return "unknown";
+ }
+ }
+
+ /**
+ * @param groupKey The group key used to group {@link MessageRoute}s.
+ * @return The name of the entity.
+ */
+ public static String entityFromKey(String groupKey) {
+ String[] pieces = groupKey.split(SEPARATOR);
+ if(pieces.length == 3) {
+ return pieces[1];
+ } else {
+ return "unknown";
+ }
}
+
+ /**
+ * @param groupKey The group key used to group {@link MessageRoute}s.
+ * @return The period identifier.
+ */
+ public static String periodFromKey(String groupKey) {
+ String[] pieces = groupKey.split(SEPARATOR);
+ if(pieces.length == 3) {
+ return pieces[2];
+ } else {
+ return "unknown";
+ }
+ }
+
}
diff --git
a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java
b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java
index 273695b..7283b48 100644
---
a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java
+++
b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java
@@ -39,9 +39,13 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
+import static java.lang.String.format;
import static java.util.Comparator.comparing;
import static
org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION;
import static
org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION_UNITS;
+import static
org.apache.metron.profiler.spark.function.GroupByPeriodFunction.entityFromKey;
+import static
org.apache.metron.profiler.spark.function.GroupByPeriodFunction.periodFromKey;
+import static
org.apache.metron.profiler.spark.function.GroupByPeriodFunction.profileFromKey;
/**
* The function responsible for building profiles in Spark.
@@ -70,7 +74,7 @@ public class ProfileBuilderFunction implements
MapGroupsFunction<String, Message
* @return
*/
@Override
- public ProfileMeasurementAdapter call(String group, Iterator<MessageRoute>
iterator) throws Exception {
+ public ProfileMeasurementAdapter call(String group, Iterator<MessageRoute>
iterator) {
// create the distributor; some settings are unnecessary because it is
cleaned-up immediately after processing the batch
int maxRoutes = Integer.MAX_VALUE;
long profileTTLMillis = Long.MAX_VALUE;
@@ -89,15 +93,28 @@ public class ProfileBuilderFunction implements
MapGroupsFunction<String, Message
}
// flush the profile
+ ProfileMeasurementAdapter result;
List<ProfileMeasurement> measurements = distributor.flush();
- if(measurements.size() > 1) {
- throw new IllegalStateException("No more than 1 profile measurement is
expected");
+ if(measurements.size() == 1) {
+ ProfileMeasurement m = measurements.get(0);
+ result = new ProfileMeasurementAdapter(m);
+ LOG.debug("Profile measurement created; profile={}, entity={},
period={}, value={}",
+ m.getProfileName(), m.getEntity(), m.getPeriod().getPeriod(),
m.getProfileValue());
+
+ } else if(measurements.size() == 0) {
+ String msg = format("No profile measurement can be calculated. Review
the profile for bugs. profile=%s, entity=%s, period=%s",
+ profileFromKey(group), entityFromKey(group),
periodFromKey(group));
+ LOG.error(msg);
+ throw new IllegalStateException(msg);
+
+ } else {
+ String msg = format("Expected 1 profile measurement, but got %d.
profile=%s, entity=%s, period=%s",
+ measurements.size(), profileFromKey(group),
entityFromKey(group), periodFromKey(group));
+ LOG.error(msg);
+ throw new IllegalStateException(msg);
}
- ProfileMeasurement m = measurements.get(0);
- LOG.debug("Profile measurement created; profile={}, entity={}, period={},
value={}",
- m.getProfileName(), m.getEntity(), m.getPeriod().getPeriod(),
m.getProfileValue());
- return new ProfileMeasurementAdapter(m);
+ return result;
}
private static <T> Stream<T> toStream(Iterator<T> iterator) {
diff --git
a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
index 83800af..9ea151a 100644
---
a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
+++
b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
@@ -20,7 +20,6 @@
package org.apache.metron.profiler.spark;
import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.common.configuration.profiler.ProfilerConfig;
import org.apache.metron.hbase.mock.MockHBaseTableProvider;
import org.apache.metron.profiler.client.stellar.FixedLookback;
import org.apache.metron.profiler.client.stellar.GetProfile;
@@ -30,6 +29,7 @@ import
org.apache.metron.stellar.common.StellarStatefulExecutor;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
import org.apache.spark.SparkConf;
+import org.apache.spark.SparkException;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.AfterClass;
@@ -41,13 +41,13 @@ import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import static
org.apache.metron.common.configuration.profiler.ProfilerConfig.fromJSON;
import static
org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
import static
org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
import static
org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER;
@@ -59,10 +59,11 @@ import static
org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INP
import static
org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_FORMAT;
import static
org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_PATH;
import static
org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_READER;
+import static org.apache.metron.profiler.spark.reader.TelemetryReaders.JSON;
+import static org.apache.metron.profiler.spark.reader.TelemetryReaders.ORC;
+import static org.apache.metron.profiler.spark.reader.TelemetryReaders.PARQUET;
import static org.junit.Assert.assertTrue;
-import static org.apache.metron.profiler.spark.reader.TelemetryReaders.*;
-
/**
* An integration test for the {@link BatchProfiler}.
*/
@@ -166,7 +167,7 @@ public class BatchProfilerIntegrationTest {
profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(),
"src/test/resources/telemetry.json");
BatchProfiler profiler = new BatchProfiler();
- profiler.run(spark, profilerProperties, getGlobals(), readerProperties,
getProfile());
+ profiler.run(spark, profilerProperties, getGlobals(), readerProperties,
fromJSON(profileJson));
validateProfiles();
}
@@ -188,7 +189,7 @@ public class BatchProfilerIntegrationTest {
profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), pathToORC);
BatchProfiler profiler = new BatchProfiler();
- profiler.run(spark, profilerProperties, getGlobals(), readerProperties,
getProfile());
+ profiler.run(spark, profilerProperties, getGlobals(), readerProperties,
fromJSON(profileJson));
validateProfiles();
}
@@ -210,7 +211,7 @@ public class BatchProfilerIntegrationTest {
profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), inputPath);
BatchProfiler profiler = new BatchProfiler();
- profiler.run(spark, profilerProperties, getGlobals(), readerProperties,
getProfile());
+ profiler.run(spark, profilerProperties, getGlobals(), readerProperties,
fromJSON(profileJson));
validateProfiles();
}
@@ -239,7 +240,7 @@ public class BatchProfilerIntegrationTest {
readerProperties.put("header", "true");
BatchProfiler profiler = new BatchProfiler();
- profiler.run(spark, profilerProperties, getGlobals(), readerProperties,
getProfile());
+ profiler.run(spark, profilerProperties, getGlobals(), readerProperties,
fromJSON(profileJson));
validateProfiles();
}
@@ -255,7 +256,7 @@ public class BatchProfilerIntegrationTest {
profilerProperties.put(TELEMETRY_INPUT_END.getKey(),
"2018-07-07T15:51:48Z");
BatchProfiler profiler = new BatchProfiler();
- profiler.run(spark, profilerProperties, getGlobals(), readerProperties,
getProfile());
+ profiler.run(spark, profilerProperties, getGlobals(), readerProperties,
fromJSON(profileJson));
// the max timestamp in the data is around July 7, 2018
assign("maxTimestamp", "1530978728982L");
@@ -279,7 +280,7 @@ public class BatchProfilerIntegrationTest {
profilerProperties.put(TELEMETRY_INPUT_END.getKey(), "");
BatchProfiler profiler = new BatchProfiler();
- profiler.run(spark, profilerProperties, getGlobals(), readerProperties,
getProfile());
+ profiler.run(spark, profilerProperties, getGlobals(), readerProperties,
fromJSON(profileJson));
// the max timestamp in the data is around July 7, 2018
assign("maxTimestamp", "1530978728982L");
@@ -293,6 +294,40 @@ public class BatchProfilerIntegrationTest {
}
/**
+ * {
+ * "timestampField": "timestamp",
+ * "profiles": [
+ * {
+ * "profile": "count-by-ip",
+ * "foreach": "ip_src_addr",
+ * "init": { "count": 0 },
+ * "update": { "count" : "count + 1" },
+ * "result": "count"
+ * },
+ * {
+ * "profile": "invalid-profile",
+ * "foreach": "'total'",
+ * "init": { "count": 0 },
+ * "update": { "count": "count + 1" },
+ * "result": "INVALID_FUNCTION(count)"
+ * }
+ * ]
+ * }
+ */
+ @Multiline
+ private static String invalidProfileJson;
+
+ @Test(expected = SparkException.class)
+ public void testBatchProfilerWithInvalidProfile() throws Exception {
+ profilerProperties.put(TELEMETRY_INPUT_READER.getKey(), JSON.toString());
+ profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(),
"src/test/resources/telemetry.json");
+
+ // the batch profiler should error out, if there is a bug in *any* of the
profiles
+ BatchProfiler profiler = new BatchProfiler();
+ profiler.run(spark, profilerProperties, getGlobals(), readerProperties,
fromJSON(invalidProfileJson));
+ }
+
+ /**
* Validates the profiles that were built.
*
* These tests use the Batch Profiler to seed two profiles with archived
telemetry. The first profile
@@ -316,10 +351,6 @@ public class BatchProfilerIntegrationTest {
assertTrue(execute("[100] == PROFILE_GET('total-count', 'total', window)",
Boolean.class));
}
- private ProfilerConfig getProfile() throws IOException {
- return ProfilerConfig.fromJSON(profileJson);
- }
-
private Properties getGlobals() {
return new Properties();
}
diff --git
a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/GroupByPeriodFunctionTest.java
b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/GroupByPeriodFunctionTest.java
new file mode 100644
index 0000000..78960c4
--- /dev/null
+++
b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/GroupByPeriodFunctionTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark.function;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.profiler.MessageRoute;
+import org.apache.metron.profiler.ProfilePeriod;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+public class GroupByPeriodFunctionTest {
+
+ /**
+ * {
+ * "profile": "my-profile-name",
+ * "foreach": "'total'",
+ * "init": { "count": 0 },
+ * "update": { "count": "count + 1" },
+ * "result": "count"
+ * }
+ */
+ @Multiline
+ private String profileJSON;
+
+ @Test
+ public void shouldDecodeGroupKey() throws Exception {
+ final ProfileConfig profile = ProfileConfig.fromJSON(profileJSON);
+ final Long timestamp = System.currentTimeMillis();
+ final String entity = "192.168.1.1";
+ final JSONObject message = new JSONObject();
+ final String periodId = new Long(ProfilePeriod.fromTimestamp(timestamp,
15, TimeUnit.MINUTES).getPeriod()).toString();
+
+ MessageRoute route = new MessageRoute(profile, entity, message, timestamp);
+ String groupKey = new GroupByPeriodFunction(new Properties()).call(route);
+
+ // should be able to extract the profile, entity and period from the group
key
+ Assert.assertEquals("my-profile-name",
GroupByPeriodFunction.profileFromKey(groupKey));
+ Assert.assertEquals(entity, GroupByPeriodFunction.entityFromKey(groupKey));
+ Assert.assertEquals(periodId,
GroupByPeriodFunction.periodFromKey(groupKey));
+ }
+
+}
diff --git
a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunctionTest.java
b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunctionTest.java
index d5a4dba..9e1e0b3 100644
---
a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunctionTest.java
+++
b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunctionTest.java
@@ -19,6 +19,7 @@
*/
package org.apache.metron.profiler.spark.function;
+import org.adrianwalker.multilinestring.Multiline;
import org.apache.metron.common.configuration.profiler.ProfileConfig;
import org.apache.metron.profiler.MessageRoute;
import org.apache.metron.profiler.ProfilePeriod;
@@ -39,13 +40,25 @@ import static
org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATI
public class ProfileBuilderFunctionTest {
+ /**
+ * {
+ * "profile": "total-count",
+ * "foreach": "'total'",
+ * "init": { "count": 0 },
+ * "update": { "count": "count + 1" },
+ * "result": "count"
+ * }
+ */
+ @Multiline
+ private String profileJSON;
+
@Test
- public void testBuildProfile() throws Exception {
+ public void shouldBuildProfileMeasurement() throws Exception {
// setup the message and profile
JSONObject message = getMessage();
String entity = "192.168.1.1";
long timestamp = (Long) message.get("timestamp");
- ProfileConfig profile = getProfile();
+ ProfileConfig profile = ProfileConfig.fromJSON(profileJSON);
// setup the route
MessageRoute route = new MessageRoute(profile, entity, message, timestamp);
@@ -71,6 +84,39 @@ public class ProfileBuilderFunctionTest {
Assert.assertEquals(expectedPeriod.getPeriod(), (long)
measurement.getPeriodId());
}
+ /**
+ * {
+ * "profile": "total-count",
+ * "foreach": "'total'",
+ * "init": { "count": 0 },
+ * "update": { "count": "count + 1" },
+ * "result": "INVALID_FUNCTION(count)"
+ * }
+ */
+ @Multiline
+ private static String invalidProfileJson;
+
+ @Test(expected = IllegalStateException.class)
+ public void shouldThrowExceptionIfInvalidProfile() throws Exception {
+ // setup the message and profile
+ JSONObject message = getMessage();
+ String entity = "192.168.1.1";
+ long timestamp = (Long) message.get("timestamp");
+ ProfileConfig profile = ProfileConfig.fromJSON(invalidProfileJson);
+
+ // setup the route
+ MessageRoute route = new MessageRoute(profile, entity, message, timestamp);
+ List<MessageRoute> routes = new ArrayList();
+ routes.add(route);
+ routes.add(route);
+ routes.add(route);
+ Properties profilerProperties = getProfilerProperties();
+
+ // an exception should be thrown, if there is a bug in the profile
definition
+ ProfileBuilderFunction function = new
ProfileBuilderFunction(profilerProperties, getGlobals());
+ ProfileMeasurementAdapter measurement =
function.call("profile1-192.168.1.1-0", routes.iterator());
+ }
+
private JSONObject getMessage() {
JSONObject message = new JSONObject();
message.put("ip_src_addr", "192.168.1.1");
@@ -86,13 +132,4 @@ public class ProfileBuilderFunctionTest {
private Map<String, String> getGlobals() {
return Collections.emptyMap();
}
-
- private ProfileConfig getProfile() {
- return new ProfileConfig()
- .withProfile("profile1")
- .withForeach("ip_src_addr")
- .withUpdate("count", "count + 1")
- .withResult("count");
-
- }
}