Github user haoch commented on a diff in the pull request:
https://github.com/apache/incubator-eagle/pull/8#discussion_r46768435
--- Diff:
eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
---
@@ -37,34 +44,72 @@
public class HdfsAuditLogProcessorMain {
private static final Logger LOG =
LoggerFactory.getLogger(HdfsAuditLogProcessorMain.class);
- public static void main(String[] args) throws Exception{
- Config config = new ConfigOptionParser().load(args);
+ public static PartitionStrategy createStrategy(Config config) {
+ String host = config.getString(EagleConfigConstants.EAGLE_PROPS +
"." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+ Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS +
"." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+ String username =
config.getString(EagleConfigConstants.EAGLE_PROPS + "." +
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+ String password =
config.getString(EagleConfigConstants.EAGLE_PROPS + "." +
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+ String topic = config.getString("dataSourceConfig.topic");
+ DataDistributionDao dao = new DataDistributionDaoImpl(host, port,
username, password, topic);
+ PartitionAlgorithm algorithm = new GreedyPartitionAlgorithm();
+ String key1 = EagleConfigConstants.EAGLE_PROPS +
".partitionRefreshIntervalInMin";
+ Integer partitionRefreshIntervalInMin = config.hasPath(key1) ?
config.getInt(key1) : 60;
+ String key2 = EagleConfigConstants.EAGLE_PROPS +
".kafkaStatisticRangeInMin";
+ Integer kafkaStatisticRangeInMin = config.hasPath(key2) ?
config.getInt(key2) : 60;
+ PartitionStrategy strategy = new PartitionStrategyImpl(dao,
algorithm, partitionRefreshIntervalInMin * DateUtils.MILLIS_PER_MINUTE,
kafkaStatisticRangeInMin * DateUtils.MILLIS_PER_MINUTE);
+ return strategy;
+ }
- LOG.info("Config class: " + config.getClass().getCanonicalName());
-
- if(LOG.isDebugEnabled()) LOG.debug("Config
content:"+config.root().render(ConfigRenderOptions.concise()));
+ public static KafkaSourcedSpoutProvider createProvider(Config config) {
+ String deserClsName =
config.getString("dataSourceConfig.deserializerClass");
+ final KafkaSourcedSpoutScheme scheme = new
KafkaSourcedSpoutScheme(deserClsName, config) {
+ @Override
+ public List<Object> deserialize(byte[] ser) {
+ Object tmp = deserializer.deserialize(ser);
+ Map<String, Object> map = (Map<String,
Object>)tmp;
+ if(tmp == null) return null;
+ return Arrays.asList(map.get("user"), tmp);
+ }
+ };
+ KafkaSourcedSpoutProvider provider = new
KafkaSourcedSpoutProvider() {
+ @Override
+ public SchemeAsMultiScheme getStreamScheme(String
deserClsName, Config context) {
+ return new SchemeAsMultiScheme(scheme);
+ }
+ };
+ return provider;
+ }
- StormExecutionEnvironment env =
ExecutionEnvironmentFactory.getStorm(config);
-
- String deserClsName =
config.getString("dataSourceConfig.deserializerClass");
- final KafkaSourcedSpoutScheme scheme = new
KafkaSourcedSpoutScheme(deserClsName, config) {
- @Override
- public List<Object> deserialize(byte[] ser) {
- Object tmp = deserializer.deserialize(ser);
- Map<String, Object> map = (Map<String, Object>)tmp;
- if(tmp == null) return null;
- return Arrays.asList(map.get("user"), tmp);
- }
- };
- KafkaSourcedSpoutProvider provider = new
KafkaSourcedSpoutProvider() {
- public SchemeAsMultiScheme getStreamScheme(String
deserClsName, Config context) {
- return new SchemeAsMultiScheme(scheme);
- }
- };
+ public static void execWithDefaultPartition(Config config,
StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) {
env.newSource(provider.getSpout(config)).renameOutputFields(2).withName("kafkaMsgConsumer").groupBy(Arrays.asList(0))
.flatMap(new
FileSensitivityDataJoinExecutor()).groupBy(Arrays.asList(0))
.flatMap(new IPZoneDataJoinExecutor())
.alertWithConsumer("hdfsAuditLogEventStream",
"hdfsAuditLogAlertExecutor");
env.execute();
+ }
+
+ public static void execWithBalancedPartition(Config config,
StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) {
+ PartitionStrategy strategy = createStrategy(config);
+
env.newSource(provider.getSpout(config)).renameOutputFields(2).withName("kafkaMsgConsumer").customGroupBy(strategy)
+ .flatMap(new
FileSensitivityDataJoinExecutor()).customGroupBy(strategy)
+ .flatMap(new IPZoneDataJoinExecutor())
+ .alertWithConsumer("hdfsAuditLogEventStream",
"hdfsAuditLogAlertExecutor", strategy);
+ env.execute();
+ }
+
+ public static void main(String[] args) throws Exception{
+ Config config = new ConfigOptionParser().load(args);
+ LOG.info("Config class: " + config.getClass().getCanonicalName());
+ if(LOG.isDebugEnabled()) LOG.debug("Config
content:"+config.root().render(ConfigRenderOptions.concise()));
+
+ StormExecutionEnvironment env =
ExecutionEnvironmentFactory.getStorm(config);
+ KafkaSourcedSpoutProvider provider = createProvider(config);
+ Boolean balancePartition =
config.hasPath("eagleProps.balancePartitionEnabled") ?
config.getBoolean("eagleProps.balancePartitionEnabled") : false;
--- End diff --
Such kind of config key is not very good
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---