[ 
https://issues.apache.org/jira/browse/EAGLE-2?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15043778#comment-15043778
 ] 

ASF GitHub Bot commented on EAGLE-2:
------------------------------------

Github user haoch commented on a diff in the pull request:

    https://github.com/apache/incubator-eagle/pull/8#discussion_r46768435
  
    --- Diff: 
eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
 ---
    @@ -37,34 +44,72 @@
     public class HdfsAuditLogProcessorMain {
        private static final Logger LOG = 
LoggerFactory.getLogger(HdfsAuditLogProcessorMain.class);
     
    -   public static void main(String[] args) throws Exception{
    -        Config config = new ConfigOptionParser().load(args);
    +    public static PartitionStrategy createStrategy(Config config) {
    +        String host = config.getString(EagleConfigConstants.EAGLE_PROPS + 
"." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
    +        Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS + 
"." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
    +        String username = 
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
    +        String password = 
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
    +        String topic = config.getString("dataSourceConfig.topic");
    +        DataDistributionDao dao = new DataDistributionDaoImpl(host, port, 
username, password, topic);
    +        PartitionAlgorithm algorithm = new GreedyPartitionAlgorithm();
    +        String key1 = EagleConfigConstants.EAGLE_PROPS + 
".partitionRefreshIntervalInMin";
    +        Integer partitionRefreshIntervalInMin = config.hasPath(key1) ? 
config.getInt(key1) : 60;
    +        String key2 = EagleConfigConstants.EAGLE_PROPS + 
".kafkaStatisticRangeInMin";
    +        Integer kafkaStatisticRangeInMin =  config.hasPath(key2) ? 
config.getInt(key2) : 60;
    +        PartitionStrategy strategy = new PartitionStrategyImpl(dao, 
algorithm, partitionRefreshIntervalInMin * DateUtils.MILLIS_PER_MINUTE, 
kafkaStatisticRangeInMin * DateUtils.MILLIS_PER_MINUTE);
    +        return strategy;
    +    }
     
    -        LOG.info("Config class: " + config.getClass().getCanonicalName());
    -
    -        if(LOG.isDebugEnabled()) LOG.debug("Config 
content:"+config.root().render(ConfigRenderOptions.concise()));
    +    public static KafkaSourcedSpoutProvider createProvider(Config config) {
    +         String deserClsName = 
config.getString("dataSourceConfig.deserializerClass");
    +         final KafkaSourcedSpoutScheme scheme = new 
KafkaSourcedSpoutScheme(deserClsName, config) {
    +                 @Override
    +                 public List<Object> deserialize(byte[] ser) {
    +                         Object tmp = deserializer.deserialize(ser);
    +                         Map<String, Object> map = (Map<String, 
Object>)tmp;
    +                         if(tmp == null) return null;
    +                         return Arrays.asList(map.get("user"), tmp);
    +                 }
    +         };
    +         KafkaSourcedSpoutProvider provider = new 
KafkaSourcedSpoutProvider() {
    +                 @Override
    +                 public SchemeAsMultiScheme getStreamScheme(String 
deserClsName, Config context) {
    +                         return new SchemeAsMultiScheme(scheme);
    +                  }
    +         };
    +         return provider;
    +    }
     
    -        StormExecutionEnvironment env = 
ExecutionEnvironmentFactory.getStorm(config);
    -
    -        String deserClsName = 
config.getString("dataSourceConfig.deserializerClass");
    -        final KafkaSourcedSpoutScheme scheme = new 
KafkaSourcedSpoutScheme(deserClsName, config) {
    -                @Override
    -                public List<Object> deserialize(byte[] ser) {
    -                        Object tmp = deserializer.deserialize(ser);
    -                        Map<String, Object> map = (Map<String, Object>)tmp;
    -                        if(tmp == null) return null;
    -                        return Arrays.asList(map.get("user"), tmp);
    -                }
    -        };
    -        KafkaSourcedSpoutProvider provider = new 
KafkaSourcedSpoutProvider() {
    -                public SchemeAsMultiScheme getStreamScheme(String 
deserClsName, Config context) {
    -                        return new SchemeAsMultiScheme(scheme);
    -                }
    -        };
    +    public static void execWithDefaultPartition(Config config, 
StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) {
             
env.newSource(provider.getSpout(config)).renameOutputFields(2).withName("kafkaMsgConsumer").groupBy(Arrays.asList(0))
                     .flatMap(new 
FileSensitivityDataJoinExecutor()).groupBy(Arrays.asList(0))
                     .flatMap(new IPZoneDataJoinExecutor())
                     .alertWithConsumer("hdfsAuditLogEventStream", 
"hdfsAuditLogAlertExecutor");
             env.execute();
    +    }
    +
    +    public static void execWithBalancedPartition(Config config, 
StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) {
    +        PartitionStrategy strategy = createStrategy(config);
    +        
env.newSource(provider.getSpout(config)).renameOutputFields(2).withName("kafkaMsgConsumer").customGroupBy(strategy)
    +                .flatMap(new 
FileSensitivityDataJoinExecutor()).customGroupBy(strategy)
    +                .flatMap(new IPZoneDataJoinExecutor())
    +                .alertWithConsumer("hdfsAuditLogEventStream", 
"hdfsAuditLogAlertExecutor", strategy);
    +        env.execute();
    +    }
    +
    +   public static void main(String[] args) throws Exception{
    +        Config config = new ConfigOptionParser().load(args);
    +        LOG.info("Config class: " + config.getClass().getCanonicalName());
    +        if(LOG.isDebugEnabled()) LOG.debug("Config 
content:"+config.root().render(ConfigRenderOptions.concise()));
    +
    +        StormExecutionEnvironment env = 
ExecutionEnvironmentFactory.getStorm(config);
    +        KafkaSourcedSpoutProvider provider = createProvider(config);
    +        Boolean balancePartition = 
config.hasPath("eagleProps.balancePartitionEnabled") ? 
config.getBoolean("eagleProps.balancePartitionEnabled") : false;
    --- End diff --
    
    Such kind of config key is not very good


> watch message process backlog in Eagle UI
> -----------------------------------------
>
>                 Key: EAGLE-2
>                 URL: https://issues.apache.org/jira/browse/EAGLE-2
>             Project: Eagle
>          Issue Type: Improvement
>         Environment: production
>            Reporter: Edward Zhang
>            Assignee: Libin, Sun
>   Original Estimate: 96h
>  Remaining Estimate: 96h
>
> Message latency is a key factor for Eagle to enable realtime security 
> monitoring. For hdfs audit log monitoring, kafka is used as datasource. So 
> there is always some gap between current max offset in kafka and processed 
> offset in eagle. The gap is the backlog which eagle should consume quickly as 
> much as quickly. If the gap can be sampled for every minute or 20 seconds, 
> then we understand if eagle is catching up or is lagging behind more.
> The command to get current max offset in kafka is 
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list xxxx --topic 
> hdfs_audit_log --time -1
> and Storm-kafka spout would store processed offset in zookeeper, in the 
> following znode:
> /consumers/hdfs_audit_log/eagle.hdfsaudit.consumer/partition_0 
> So technically we can get the gap and write that to eagle service then in UI 
> we can watch the backlog



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to