[
https://issues.apache.org/jira/browse/EAGLE-2?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15043778#comment-15043778
]
ASF GitHub Bot commented on EAGLE-2:
------------------------------------
Github user haoch commented on a diff in the pull request:
https://github.com/apache/incubator-eagle/pull/8#discussion_r46768435
--- Diff:
eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
---
@@ -37,34 +44,72 @@
public class HdfsAuditLogProcessorMain {
private static final Logger LOG =
LoggerFactory.getLogger(HdfsAuditLogProcessorMain.class);
- public static void main(String[] args) throws Exception{
- Config config = new ConfigOptionParser().load(args);
+ public static PartitionStrategy createStrategy(Config config) {
+ String host = config.getString(EagleConfigConstants.EAGLE_PROPS +
"." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+ Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS +
"." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+ String username =
config.getString(EagleConfigConstants.EAGLE_PROPS + "." +
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+ String password =
config.getString(EagleConfigConstants.EAGLE_PROPS + "." +
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+ String topic = config.getString("dataSourceConfig.topic");
+ DataDistributionDao dao = new DataDistributionDaoImpl(host, port,
username, password, topic);
+ PartitionAlgorithm algorithm = new GreedyPartitionAlgorithm();
+ String key1 = EagleConfigConstants.EAGLE_PROPS +
".partitionRefreshIntervalInMin";
+ Integer partitionRefreshIntervalInMin = config.hasPath(key1) ?
config.getInt(key1) : 60;
+ String key2 = EagleConfigConstants.EAGLE_PROPS +
".kafkaStatisticRangeInMin";
+ Integer kafkaStatisticRangeInMin = config.hasPath(key2) ?
config.getInt(key2) : 60;
+ PartitionStrategy strategy = new PartitionStrategyImpl(dao,
algorithm, partitionRefreshIntervalInMin * DateUtils.MILLIS_PER_MINUTE,
kafkaStatisticRangeInMin * DateUtils.MILLIS_PER_MINUTE);
+ return strategy;
+ }
- LOG.info("Config class: " + config.getClass().getCanonicalName());
-
- if(LOG.isDebugEnabled()) LOG.debug("Config
content:"+config.root().render(ConfigRenderOptions.concise()));
+ public static KafkaSourcedSpoutProvider createProvider(Config config) {
+ String deserClsName =
config.getString("dataSourceConfig.deserializerClass");
+ final KafkaSourcedSpoutScheme scheme = new
KafkaSourcedSpoutScheme(deserClsName, config) {
+ @Override
+ public List<Object> deserialize(byte[] ser) {
+ Object tmp = deserializer.deserialize(ser);
+ Map<String, Object> map = (Map<String,
Object>)tmp;
+ if(tmp == null) return null;
+ return Arrays.asList(map.get("user"), tmp);
+ }
+ };
+ KafkaSourcedSpoutProvider provider = new
KafkaSourcedSpoutProvider() {
+ @Override
+ public SchemeAsMultiScheme getStreamScheme(String
deserClsName, Config context) {
+ return new SchemeAsMultiScheme(scheme);
+ }
+ };
+ return provider;
+ }
- StormExecutionEnvironment env =
ExecutionEnvironmentFactory.getStorm(config);
-
- String deserClsName =
config.getString("dataSourceConfig.deserializerClass");
- final KafkaSourcedSpoutScheme scheme = new
KafkaSourcedSpoutScheme(deserClsName, config) {
- @Override
- public List<Object> deserialize(byte[] ser) {
- Object tmp = deserializer.deserialize(ser);
- Map<String, Object> map = (Map<String, Object>)tmp;
- if(tmp == null) return null;
- return Arrays.asList(map.get("user"), tmp);
- }
- };
- KafkaSourcedSpoutProvider provider = new
KafkaSourcedSpoutProvider() {
- public SchemeAsMultiScheme getStreamScheme(String
deserClsName, Config context) {
- return new SchemeAsMultiScheme(scheme);
- }
- };
+ public static void execWithDefaultPartition(Config config,
StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) {
env.newSource(provider.getSpout(config)).renameOutputFields(2).withName("kafkaMsgConsumer").groupBy(Arrays.asList(0))
.flatMap(new
FileSensitivityDataJoinExecutor()).groupBy(Arrays.asList(0))
.flatMap(new IPZoneDataJoinExecutor())
.alertWithConsumer("hdfsAuditLogEventStream",
"hdfsAuditLogAlertExecutor");
env.execute();
+ }
+
+ public static void execWithBalancedPartition(Config config,
StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) {
+ PartitionStrategy strategy = createStrategy(config);
+
env.newSource(provider.getSpout(config)).renameOutputFields(2).withName("kafkaMsgConsumer").customGroupBy(strategy)
+ .flatMap(new
FileSensitivityDataJoinExecutor()).customGroupBy(strategy)
+ .flatMap(new IPZoneDataJoinExecutor())
+ .alertWithConsumer("hdfsAuditLogEventStream",
"hdfsAuditLogAlertExecutor", strategy);
+ env.execute();
+ }
+
+ public static void main(String[] args) throws Exception{
+ Config config = new ConfigOptionParser().load(args);
+ LOG.info("Config class: " + config.getClass().getCanonicalName());
+ if(LOG.isDebugEnabled()) LOG.debug("Config
content:"+config.root().render(ConfigRenderOptions.concise()));
+
+ StormExecutionEnvironment env =
ExecutionEnvironmentFactory.getStorm(config);
+ KafkaSourcedSpoutProvider provider = createProvider(config);
+ Boolean balancePartition =
config.hasPath("eagleProps.balancePartitionEnabled") ?
config.getBoolean("eagleProps.balancePartitionEnabled") : false;
--- End diff --
Such kind of config key is not very good
> watch message process backlog in Eagle UI
> -----------------------------------------
>
> Key: EAGLE-2
> URL: https://issues.apache.org/jira/browse/EAGLE-2
> Project: Eagle
> Issue Type: Improvement
> Environment: production
> Reporter: Edward Zhang
> Assignee: Libin, Sun
> Original Estimate: 96h
> Remaining Estimate: 96h
>
> Message latency is a key factor for Eagle to enable realtime security
> monitoring. For hdfs audit log monitoring, kafka is used as datasource. So
> there is always some gap between current max offset in kafka and processed
> offset in eagle. The gap is the backlog which eagle should consume quickly as
> much as quickly. If the gap can be sampled for every minute or 20 seconds,
> then we understand if eagle is catching up or is lagging behind more.
> The command to get current max offset in kafka is
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list xxxx --topic
> hdfs_audit_log --time -1
> and Storm-kafka spout would store processed offset in zookeeper, in the
> following znode:
> /consumers/hdfs_audit_log/eagle.hdfsaudit.consumer/partition_0
> So technically we can get the gap and write that to eagle service then in UI
> we can watch the backlog
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)