[ 
https://issues.apache.org/jira/browse/METRON-706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15858332#comment-15858332
 ] 

ASF GitHub Bot commented on METRON-706:
---------------------------------------

Github user cestella commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/445#discussion_r100131091
  
    --- Diff: 
metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/TransformFilterExtractorDecorator.java
 ---
    @@ -0,0 +1,236 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.metron.dataloads.extractor;
    +
    +import com.fasterxml.jackson.core.type.TypeReference;
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.log4j.Logger;
    +import org.apache.metron.common.configuration.ConfigurationsUtils;
    +import org.apache.metron.common.dsl.Context;
    +import org.apache.metron.common.dsl.MapVariableResolver;
    +import org.apache.metron.common.dsl.StellarFunctions;
    +import org.apache.metron.common.stellar.StellarPredicateProcessor;
    +import org.apache.metron.common.stellar.StellarProcessor;
    +import org.apache.metron.common.utils.ConversionUtils;
    +import org.apache.metron.common.utils.JSONUtils;
    +import org.apache.metron.enrichment.lookup.LookupKV;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.IOException;
    +import java.util.*;
    +
    +import static 
org.apache.metron.dataloads.extractor.TransformFilterExtractorDecorator.ExtractorOptions.*;
    +
    +public class TransformFilterExtractorDecorator extends ExtractorDecorator {
    +  private static final Logger LOG = 
Logger.getLogger(TransformFilterExtractorDecorator.class);
    +
    +  protected enum ExtractorOptions {
    +    VALUE_TRANSFORM("value_transform"),
    +    VALUE_FILTER("value_filter"),
    +    INDICATOR_TRANSFORM("indicator_transform"),
    +    INDICATOR_FILTER("indicator_filter"),
    +    ZK_QUORUM("zk_quorum"),
    +    INDICATOR("indicator");
    +
    +    private String key;
    +
    +    ExtractorOptions(String key) {
    +      this.key = key;
    +    }
    +
    +    @Override
    +    public String toString() {
    +      return key;
    +    }
    +
    +    public boolean existsIn(Map<String, Object> config) {
    +      return config.containsKey(key);
    +    }
    +  }
    +
    +  private Optional<CuratorFramework> zkClient;
    +  private Map<String, String> valueTransforms;
    +  private Map<String, String> indicatorTransforms;
    +  private String valueFilter;
    +  private String indicatorFilter;
    +  private Context stellarContext;
    +  private StellarProcessor transformProcessor;
    +  private StellarPredicateProcessor filterProcessor;
    +  private Map<String, Object> globalConfig;
    +
    +  public TransformFilterExtractorDecorator(Extractor decoratedExtractor) {
    +    super(decoratedExtractor);
    +    this.zkClient = Optional.empty();
    +    this.valueTransforms = new LinkedHashMap<>();
    +    this.indicatorTransforms = new LinkedHashMap<>();
    +    this.valueFilter = "";
    +    this.indicatorFilter = "";
    +    this.transformProcessor = new StellarProcessor();
    +    this.filterProcessor = new StellarPredicateProcessor();
    +  }
    +
    +  @Override
    +  public void initialize(Map<String, Object> config) {
    +    super.initialize(config);
    +    if (VALUE_TRANSFORM.existsIn(config)) {
    +      this.valueTransforms = getTransforms(config, 
VALUE_TRANSFORM.toString());
    +    }
    +    if (INDICATOR_TRANSFORM.existsIn(config)) {
    +      this.indicatorTransforms = getTransforms(config, 
INDICATOR_TRANSFORM.toString());
    +    }
    +    if (VALUE_FILTER.existsIn(config)) {
    +      this.valueFilter = getFilter(config, VALUE_FILTER.toString());
    +    }
    +    if (INDICATOR_FILTER.existsIn(config)) {
    +      this.indicatorFilter = getFilter(config, 
INDICATOR_FILTER.toString());
    +    }
    +    String zkClientUrl = "";
    +    if (ZK_QUORUM.existsIn(config)) {
    +      zkClientUrl = 
ConversionUtils.convert(config.get(ZK_QUORUM.toString()), String.class);
    +    }
    +    zkClient = setupClient(zkClient, zkClientUrl);
    +    this.globalConfig = getGlobalConfig(zkClient);
    +    this.stellarContext = createContext(zkClient);
    +    StellarFunctions.initialize(stellarContext);
    +    this.transformProcessor = new StellarProcessor();
    +    this.filterProcessor = new StellarPredicateProcessor();
    +  }
    +
    +  private String getFilter(Map<String, Object> config, String valueFilter) 
{
    +    return ConversionUtils.convertOrFail(config.get(valueFilter), 
String.class);
    +  }
    +
    +  /**
    +   * Get a map of the transformations from the config of the specified type
    +   * @param config main config map
    +   * @param type the transformation type to get from config
    +   * @return map of transformations.
    +   */
    +  private Map<String, String> getTransforms(Map<String, Object> config, 
String type) {
    +    Map<Object, Object> transformsConfig = 
ConversionUtils.convertOrFail(config.get(type), Map.class);
    +    Map<String, String> transforms = new LinkedHashMap<>();
    +    for (Map.Entry<Object, Object> e : transformsConfig.entrySet()) {
    +      String key = ConversionUtils.convertOrFail(e.getKey(), String.class);
    +      String val = ConversionUtils.convertOrFail(e.getValue(), 
String.class);
    +      transforms.put(key, val);
    +    }
    +    return transforms;
    +  }
    +
    +  /**
    +   * Creates a Zookeeper client if it doesn't exist and a url for zk is 
provided.
    +   * @param zookeeperUrl The Zookeeper URL.
    +   */
    +  private Optional<CuratorFramework> 
setupClient(Optional<CuratorFramework> zkClient, String zookeeperUrl) {
    +    // can only create client if we have a valid zookeeper URL
    +    if (!zkClient.isPresent()) {
    +      if (StringUtils.isNotBlank(zookeeperUrl)) {
    +        CuratorFramework client = 
ConfigurationsUtils.getClient(zookeeperUrl);
    +        client.start();
    +        return Optional.of(client);
    +      } else {
    +        LOG.warn("Unable to setup zookeeper client - zk_quorum url not 
provided. **This will limit some Stellar functionality**");
    +        return Optional.empty();
    +      }
    +    } else {
    +      return zkClient;
    +    }
    +  }
    +
    +  private Map<String, Object> getGlobalConfig(Optional<CuratorFramework> 
zkClient) {
    +    if (zkClient.isPresent()) {
    +      try {
    +        return JSONUtils.INSTANCE.load(
    +                new 
ByteArrayInputStream(ConfigurationsUtils.readGlobalConfigBytesFromZookeeper(zkClient.get())),
    +                new TypeReference<Map<String, Object>>() {
    +                });
    +      } catch (Exception e) {
    +        LOG.warn("Exception thrown while attempting to get global config 
from Zookeeper.", e);
    +      }
    +    }
    +    return new LinkedHashMap<>();
    +  }
    +
    +  private Context createContext(Optional<CuratorFramework> zkClient) {
    +    Context.Builder builder = new Context.Builder();
    +    if (zkClient.isPresent()) {
    +      builder.with(Context.Capabilities.ZOOKEEPER_CLIENT, zkClient::get)
    +              .with(Context.Capabilities.GLOBAL_CONFIG, () -> 
globalConfig);
    --- End diff --
    
    I mean `{}`


> Add Stellar transformations and filters to enrichment and threat intel loaders
> ------------------------------------------------------------------------------
>
>                 Key: METRON-706
>                 URL: https://issues.apache.org/jira/browse/METRON-706
>             Project: Metron
>          Issue Type: Improvement
>            Reporter: Michael Miklavcic
>            Assignee: Michael Miklavcic
>
> This Jira tracks work to add the ability to transform and filter data being 
> loaded into the enrichment and threatintel HBase tables.
> This effort builds on the work in:
> https://issues.apache.org/jira/browse/METRON-678
> and
> https://issues.apache.org/jira/browse/METRON-682



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to