[ https://issues.apache.org/jira/browse/METRON-1460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16388690#comment-16388690 ]
ASF GitHub Bot commented on METRON-1460: ---------------------------------------- Github user nickwallen commented on a diff in the pull request: https://github.com/apache/metron/pull/940#discussion_r172694248 --- Diff: metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java --- @@ -0,0 +1,281 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.enrichment.parallel; + +import com.github.benmanes.caffeine.cache.stats.CacheStats; +import org.apache.metron.common.Constants; +import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; +import org.apache.metron.common.configuration.enrichment.handler.ConfigHandler; +import org.apache.metron.common.performance.PerformanceLogger; +import org.apache.metron.common.utils.MessageUtils; +import org.apache.metron.enrichment.bolt.CacheKey; +import org.apache.metron.enrichment.interfaces.EnrichmentAdapter; +import org.apache.metron.enrichment.utils.EnrichmentUtils; +import org.json.simple.JSONObject; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.BinaryOperator; +import java.util.function.Supplier; + +/** + * This is an independent component which will accept a message and a set of enrichment adapters as well as a config which defines + * how those enrichments should be performed and fully enrich the message. The result will be the enriched message + * unified together and a list of errors which happened. + */ +public class ParallelEnricher { + + private Map<String, EnrichmentAdapter<CacheKey>> enrichmentsByType = new HashMap<>(); + private EnumMap<EnrichmentStrategies, CacheStats> cacheStats = new EnumMap<>(EnrichmentStrategies.class); + + /** + * The result of an enrichment. + */ + public static class EnrichmentResult { + private JSONObject result; + private List<Map.Entry<Object, Throwable>> enrichmentErrors; + + public EnrichmentResult(JSONObject result, List<Map.Entry<Object, Throwable>> enrichmentErrors) { + this.result = result; + this.enrichmentErrors = enrichmentErrors; + } + + /** + * The unified fully enriched result. + * @return + */ + public JSONObject getResult() { + return result; + } + + /** + * The errors that happened in the course of enriching. + * @return + */ + public List<Map.Entry<Object, Throwable>> getEnrichmentErrors() { + return enrichmentErrors; + } + } + + private ConcurrencyContext concurrencyContext; + + /** + * Construct a parallel enricher with a set of enrichment adapters associated with their enrichment types. + * @param enrichmentsByType + */ + public ParallelEnricher( Map<String, EnrichmentAdapter<CacheKey>> enrichmentsByType + , ConcurrencyContext concurrencyContext + , boolean logStats + ) + { + this.enrichmentsByType = enrichmentsByType; + this.concurrencyContext = concurrencyContext; + if(logStats) { + for(EnrichmentStrategies s : EnrichmentStrategies.values()) { + cacheStats.put(s, null); + } + } + } + + /** + * Fully enriches a message. Each enrichment is done in parallel via a threadpool. + * Each enrichment is fronted with a LRU cache. + * + * @param message the message to enrich + * @param strategy The enrichment strategy to use (e.g. enrichment or threat intel) + * @param config The sensor enrichment config + * @param perfLog The performance logger. We log the performance for this call, the split portion and the enrichment portion. + * @return the enrichment result + */ + public EnrichmentResult apply( JSONObject message + , EnrichmentStrategies strategy + , SensorEnrichmentConfig config + , PerformanceLogger perfLog + ) throws ExecutionException, InterruptedException { + if(message == null) { + return null; + } + if(perfLog != null) { + perfLog.mark("execute"); + if(perfLog.isDebugEnabled() && !cacheStats.isEmpty()) { + CacheStats before = cacheStats.get(strategy); + CacheStats after = concurrencyContext.getCache().stats(); + if(before != null && after != null) { + CacheStats delta = after.minus(before); + perfLog.log("cache", delta.toString()); + } + cacheStats.put(strategy, after); + } + } + String sensorType = MessageUtils.getSensorType(message); + message.put(getClass().getSimpleName().toLowerCase() + ".splitter.begin.ts", "" + System.currentTimeMillis()); --- End diff -- I would expect to see this in the split/join bolts. When this is used in the unified topology, we're still going to report a split/join time? > Create a complementary non-split-join enrichment topology > --------------------------------------------------------- > > Key: METRON-1460 > URL: https://issues.apache.org/jira/browse/METRON-1460 > Project: Metron > Issue Type: New Feature > Reporter: Casey Stella > Priority: Major > > There are some deficiencies to the split/join topology. > * It's hard to reason about > * Understanding the latency of enriching a message requires looking at > multiple bolts that each give summary statistics > * The join bolt's cache is really hard to reason about when performance > tuning > * During spikes in traffic, you can overload the join bolt's cache and drop > messages if you aren't careful > * In general, it's hard to associate a cache size and a duration kept in > cache with throughput and latency > * There are a lot of network hops per message > * Right now we are stuck at 2 stages of transformations being done > (enrichment and threat intel). It's very possible that you might want > stellar enrichments to depend on the output of other stellar enrichments. In > order to implement this in split/join you'd have to create a cycle in the > storm topology > > I propose that we move to a model where we do enrichments in a single bolt in > parallel using a static threadpool (e.g. multiple workers in the same process > would share the threadpool). IN all other ways, this would be backwards > compatible. A transparent drop-in for the existing enrichment topology. > There are some pros/cons about this too: > * Pro > * Easier to reason about from an individual message perspective > * Architecturally decoupled from Storm > * This sets us up if we want to consider other streaming technologies > * Fewer bolts > * spout -> enrichment bolt -> threatintel bolt -> output bolt > * Way fewer network hops per message > * currently 2n+1 where n is the number of enrichments used (if using stellar > subgroups, each subgroup is a hop) > * Easier to reason about from a performance perspective > * We trade cache size and eviction timeout for threadpool size > * We set ourselves up to have stellar subgroups with dependencies > * i.e. stellar subgroups that depend on the output of other subgroups > * If we do this, we can shrink the topology to just spout -> > enrichment/threat intel -> output > * Con > * We can no longer tune stellar enrichments independent from HBase > enrichments > * To be fair, with enrichments moving to stellar, this is the case in the > split/join approach too > * No idea about performance > What I propose is to submit a PR that will deliver an alternative, completely > backwards compatible topology for enrichment that you can use by adjusting > the start_enrichment_topology.sh script to use remote-unified.yaml instead of > remote.yaml. If we live with it for a while and have some good experiences > with it, maybe we can consider retiring the old enrichment topology. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)