Github user nickwallen commented on a diff in the pull request:
https://github.com/apache/metron/pull/1150#discussion_r209771180
--- Diff:
metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java
---
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark.function;
+
+import org.apache.metron.profiler.DefaultMessageDistributor;
+import org.apache.metron.profiler.MessageDistributor;
+import org.apache.metron.profiler.MessageRoute;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.spark.ProfileMeasurementAdapter;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.spark.api.java.function.MapGroupsFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static java.util.Comparator.comparing;
+import static
org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION;
+import static
org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION_UNITS;
+
+/**
+ * The function responsible for building profiles in Spark.
+ */
+public class ProfileBuilderFunction implements MapGroupsFunction<String,
MessageRoute, ProfileMeasurementAdapter> {
+
+ protected static final Logger LOG =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private long periodDurationMillis;
+ private Map<String, String> globals;
+
+ public ProfileBuilderFunction(Properties properties, Map<String, String>
globals) {
+ TimeUnit periodDurationUnits =
TimeUnit.valueOf(PERIOD_DURATION_UNITS.get(properties, String.class));
+ int periodDuration = PERIOD_DURATION.get(properties, Integer.class);
+ this.periodDurationMillis =
periodDurationUnits.toMillis(periodDuration);
+ this.globals = globals;
+ }
+
+ /**
+ * Build a profile from a set of message routes.
+ *
+ * <p>This assumes that all of the necessary routes have been provided
+ *
+ * @param group The group identifier.
+ * @param iterator The message routes.
+ * @return
+ */
+ @Override
+ public ProfileMeasurementAdapter call(String group,
Iterator<MessageRoute> iterator) throws Exception {
+ // create the distributor; some settings are unnecessary because it is
cleaned-up immediately after processing the batch
+ int maxRoutes = Integer.MAX_VALUE;
+ long profileTTLMillis = Long.MAX_VALUE;
+ MessageDistributor distributor = new
DefaultMessageDistributor(periodDurationMillis, profileTTLMillis, maxRoutes);
+ Context context = TaskUtils.getContext(globals);
+
+ // sort the messages/routes
+ List<MessageRoute> routes = toStream(iterator)
+ .sorted(comparing(rt -> rt.getTimestamp()))
+ .collect(Collectors.toList());
+ LOG.debug("Building a profile for group '{}' from {} message(s)",
group, routes.size());
+
+ // apply each message/route to build the profile
+ for(MessageRoute route: routes) {
+ distributor.distribute(route, context);
+ }
--- End diff --
Or maybe we not need to apply the timestamps in order? There are no strict
guarantees of ordering when the Profiler runs in Storm really. Hmm.
---