Repository: eagle Updated Branches: refs/heads/master 257a3517b -> c9c475e2a
[MINOR] Refine hdfs log throughput monitor 1. refine IEagleServiceClient 2. refine hdfs log throughput monitor 3. add topology.message.timeout.secs in topology health check app config Author: Zhao, Qingwen <[email protected]> Closes #764 from qingwen220/minor. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/c9c475e2 Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/c9c475e2 Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/c9c475e2 Branch: refs/heads/master Commit: c9c475e2a6b6e3406764c5d7fbfa44a3c5eeead8 Parents: 257a351 Author: Zhao, Qingwen <[email protected]> Authored: Mon Jan 9 09:54:10 2017 +0800 Committer: Zhao, Qingwen <[email protected]> Committed: Mon Jan 9 09:54:10 2017 +0800 ---------------------------------------------------------------------- .../eagle/service/client/IEagleServiceClient.java | 2 ++ .../service/client/impl/EagleServiceBaseClient.java | 7 ++++++- .../history/MRHistoryJobApplicationHealthCheck.java | 3 ++- .../JobConfigurationCreationServiceListener.java | 3 ++- .../jpm/mr/history/parser/TaskFailureListener.java | 3 ++- .../running/parser/MRJobEntityCreationHandler.java | 5 +++-- .../eagle/jpm/mr/running/parser/MRJobParserTest.java | 2 +- ....jpm.spark.history.SparkHistoryJobAppProvider.xml | 2 +- .../security/traffic/HadoopLogAccumulatorBolt.java | 15 ++++++++++++--- .../security/traffic/HadoopLogTrafficPersist.java | 12 ++++-------- .../eagle/security/traffic/SimpleWindowCounter.java | 12 +++++++++--- .../auditlog/AbstractHdfsAuditLogApplication.java | 6 ++++-- ...gle.security.auditlog.HdfsAuditLogAppProvider.xml | 10 ++++++++-- ...pache.eagle.topology.TopologyCheckAppProvider.xml | 12 +++++++++--- 14 files changed, 65 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/IEagleServiceClient.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/IEagleServiceClient.java b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/IEagleServiceClient.java index ce62eee..01489fe 100644 --- a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/IEagleServiceClient.java +++ b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/IEagleServiceClient.java @@ -28,6 +28,8 @@ public interface IEagleServiceClient extends IEagleServiceRequestBuilder, Closea Client getJerseyClient(); + void setReadTimeout(int timeoutMs); + IEagleServiceClient silence(boolean silence); /** http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceBaseClient.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceBaseClient.java b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceBaseClient.java index 3b717d8..4abf014 100644 --- a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceBaseClient.java +++ b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceBaseClient.java @@ -122,6 +122,10 @@ public abstract class EagleServiceBaseClient implements IEagleServiceClient { return sb; } + public void setReadTimeout(int timeoutMs) { + client.setReadTimeout(timeoutMs); + } + protected static String marshall(List<?> entities) throws JsonMappingException, JsonGenerationException, IOException { final JsonFactory factory = new JsonFactory(); final ObjectMapper mapper = new ObjectMapper(factory); @@ -132,7 +136,7 @@ public abstract class EagleServiceBaseClient implements IEagleServiceClient { protected <E extends TaggedLogAPIEntity> Map<String,List<E>> groupEntitiesByService(List<E> entities) throws EagleServiceClientException { Map<String,List<E>> serviceEntityMap = new HashMap<String, List<E>>(); if(LOG.isDebugEnabled()) LOG.debug("Grouping entities by service name"); - for(E entity: entities){ + for(E entity: entities) { if(entity == null) { LOG.warn("Skip null entity"); continue; @@ -303,6 +307,7 @@ public abstract class EagleServiceBaseClient implements IEagleServiceClient { } } this.isStopped = true; + this.getJerseyClient().destroy(); } @Override http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java index 20506c0..6f337e7 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java @@ -50,7 +50,8 @@ public class MRHistoryJobApplicationHealthCheck extends ApplicationHealthCheckBa eagleServiceConfig.username, eagleServiceConfig.password); - client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); + //client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); + client.setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); String message = ""; try { http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java index 96f2b3b..6dc5791 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java @@ -63,7 +63,8 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity eagleServiceConfig.username, eagleServiceConfig.password); - client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); + //client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); + client.setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); List<JobConfigurationAPIEntity> list = new ArrayList<>(); list.add(jobConfigurationEntity); http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java index 61be66f..40e6432 100755 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java @@ -118,7 +118,8 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener { eagleServiceConfig.username, eagleServiceConfig.password); - client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); + //client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); + client.setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); int tried = 0; while (tried <= MAX_RETRY_TIMES) { http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java index abd0594..c2cbbe5 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java @@ -77,7 +77,8 @@ public class MRJobEntityCreationHandler { eagleServiceConfig.eagleServicePort, eagleServiceConfig.username, eagleServiceConfig.password); - client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); + //client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); + client.setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); try { return createEntities(client); } catch (Exception e) { @@ -105,7 +106,7 @@ public class MRJobEntityCreationHandler { entities.clear(); } catch (Exception e) { - LOG.warn("exception found when flush entities, {}", e); + LOG.warn("exception found when flush entities", e); if (!success && count < MAX_RETRY_COUNT) { LOG.info("Sleep for a while before retrying"); Thread.sleep(10 * 1000); http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java index f2e581c..a2fb6ca 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java @@ -401,7 +401,7 @@ public class MRJobParserTest { Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) == null); Assert.assertTrue(entities.isEmpty()); verify(client, times(2)).create(any()); - verify(client, times(2)).getJerseyClient(); + verify(client, times(1)).getJerseyClient(); verify(client, times(1)).close(); } http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml index ef958cc..4c4d1cd 100644 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml @@ -61,7 +61,7 @@ <property> <name>topology.message.timeout.secs</name> <displayName>topology message timeout (secs)</displayName> - <description>default timeout is 30s</description> + <description>default timeout is 300s</description> <value>300</value> </property> <property> http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogAccumulatorBolt.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogAccumulatorBolt.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogAccumulatorBolt.java index c5cc0df..3377b4a 100644 --- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogAccumulatorBolt.java +++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogAccumulatorBolt.java @@ -50,6 +50,7 @@ public class HadoopLogAccumulatorBolt extends BaseRichBolt { private int taskId; private String site; private String appId; + private Config config; private HadoopLogTrafficPersist client; private SimpleWindowCounter accumulator; private OutputCollector collector; @@ -67,15 +68,15 @@ public class HadoopLogAccumulatorBolt extends BaseRichBolt { } else { this.windowSize = DEFAULT_WINDOW_SIZE; } - this.accumulator = new SimpleWindowCounter(windowSize); - this.client = new HadoopLogTrafficPersist(config); - + this.config = config; } @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.taskId = context.getThisTaskId(); this.collector = collector; + this.client = new HadoopLogTrafficPersist(config); + this.accumulator = new SimpleWindowCounter(windowSize); } @Override @@ -87,6 +88,9 @@ public class HadoopLogAccumulatorBolt extends BaseRichBolt { collector.ack(input); if (!isOrdered(timeInMin)) { LOG.warn("data is out of order, the estimated throughput may be incorrect"); + if (LOG.isDebugEnabled()) { + LOG.debug("time queue {} with event timestamp={}", accumulator.getTimeQueue().toString(), timeInMin); + } return; } if (accumulator.isFull()) { @@ -124,4 +128,9 @@ public class HadoopLogAccumulatorBolt extends BaseRichBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } + + @Override + public void cleanup() { + this.client.close(); + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogTrafficPersist.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogTrafficPersist.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogTrafficPersist.java index 29f61ca..59061c1 100644 --- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogTrafficPersist.java +++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogTrafficPersist.java @@ -28,20 +28,20 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; public class HadoopLogTrafficPersist implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(HadoopLogTrafficPersist.class); private static final String SINK_BATCH_SIZE = "dataSinkConfig.metricSinkBatchSize"; - private final Config config; private IEagleServiceClient client; private int batchSize; - private List<TaggedLogAPIEntity> entityBucket = new CopyOnWriteArrayList<>(); + private List<TaggedLogAPIEntity> entityBucket = new ArrayList<>(); public HadoopLogTrafficPersist(Config config) { - this.config = config; this.batchSize = config.hasPath(SINK_BATCH_SIZE) ? config.getInt(SINK_BATCH_SIZE) : 1; + this.client = new EagleServiceClientImpl(config); } public void emitMetric(GenericMetricEntity metricEntity) { @@ -51,11 +51,9 @@ public class HadoopLogTrafficPersist implements Serializable { } try { - client = new EagleServiceClientImpl(config); GenericServiceAPIResponseEntity response = client.create(entityBucket); if (response.isSuccess()) { - LOG.info("persist {} entities with starttime={}", entityBucket.size(), entityBucket.get(0).getTimestamp()); - + LOG.info("persist {} entities with the earliest time={}", entityBucket.size(), entityBucket.get(0).getTimestamp()); } else { LOG.error("Service side error: {}", response.getException()); } @@ -63,14 +61,12 @@ public class HadoopLogTrafficPersist implements Serializable { LOG.error(e.getMessage(), e); } finally { entityBucket.clear(); - close(); } } public void close() { try { if (client != null) { - this.client.getJerseyClient().destroy(); this.client.close(); } } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/SimpleWindowCounter.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/SimpleWindowCounter.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/SimpleWindowCounter.java index 5293577..4b5cbc8 100644 --- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/SimpleWindowCounter.java +++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/SimpleWindowCounter.java @@ -23,17 +23,19 @@ import java.io.Serializable; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +/** + * non-thread safe + */ public class SimpleWindowCounter implements Serializable { private int windowSize; - private Map<Long, Long> counter; private Queue<Long> timeQueue; public SimpleWindowCounter(int size) { this.windowSize = size; counter = new ConcurrentHashMap<>(windowSize); - timeQueue = new PriorityQueue<>(); + timeQueue = new PriorityQueue<>(windowSize, (a,b) -> a.compareTo(b)); } public boolean insert(long timestamp, long countVal) { @@ -63,7 +65,7 @@ public class SimpleWindowCounter implements Serializable { return counter.isEmpty(); } - public synchronized Tuple2<Long, Long> poll() { + public Tuple2<Long, Long> poll() { long oldestTimestamp = timeQueue.poll(); Tuple2<Long, Long> pair = new Tuple2<>(oldestTimestamp, counter.get(oldestTimestamp)); counter.remove(oldestTimestamp); @@ -74,4 +76,8 @@ public class SimpleWindowCounter implements Serializable { return timeQueue.peek(); } + public Queue<Long> getTimeQueue() { + return timeQueue; + } + } http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java index bc8ceb1..b21d62d 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java @@ -47,6 +47,7 @@ public abstract class AbstractHdfsAuditLogApplication extends StormApplication { public final static String IPZONE_JOIN_TASK_NUM = "topology.numOfIPZoneJoinTasks"; public final static String SINK_TASK_NUM = "topology.numOfSinkTasks"; public final static String TRAFFIC_MONITOR_ENABLED = "dataSinkConfig.trafficMonitorEnabled"; + private final static String TRAFFIC_MONITOR_TASK_NUM = "topology.numOfTrafficMonitorTasks"; @Override public StormTopology execute(Config config, StormEnvironment environment) { @@ -59,6 +60,7 @@ public abstract class AbstractHdfsAuditLogApplication extends StormApplication { int numOfSensitivityJoinTasks = config.getInt(SENSITIVITY_JOIN_TASK_NUM); int numOfIPZoneJoinTasks = config.getInt(IPZONE_JOIN_TASK_NUM); int numOfSinkTasks = config.getInt(SINK_TASK_NUM); + int numOfTrafficMonitorTasks = config.hasPath(TRAFFIC_MONITOR_TASK_NUM) ? config.getInt(TRAFFIC_MONITOR_TASK_NUM) : numOfParserTasks; builder.setSpout("ingest", spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks); @@ -88,8 +90,8 @@ public abstract class AbstractHdfsAuditLogApplication extends StormApplication { if (config.hasPath(TRAFFIC_MONITOR_ENABLED) && config.getBoolean(TRAFFIC_MONITOR_ENABLED)) { HadoopLogAccumulatorBolt auditLogAccumulator = new HadoopLogAccumulatorBolt(config); - BoltDeclarer auditLogAccumulatorDeclarer = builder.setBolt("logAccumulator", auditLogAccumulator, numOfParserTasks); - auditLogAccumulatorDeclarer.setNumTasks(numOfParserTasks).shuffleGrouping("parserBolt"); + BoltDeclarer auditLogAccumulatorDeclarer = builder.setBolt("logAccumulator", auditLogAccumulator, numOfTrafficMonitorTasks); + auditLogAccumulatorDeclarer.setNumTasks(numOfTrafficMonitorTasks).shuffleGrouping("parserBolt"); } // ------------------------------ http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml index f82f8b3..49694a5 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml @@ -62,6 +62,12 @@ <description>number of sink tasks</description> </property> <property> + <name>topology.numOfTrafficMonitorTasks</name> + <displayName>Topology Traffic Monitor Tasks</displayName> + <value>2</value> + <description>number of traffic monitor tasks</description> + </property> + <property> <name>topology.message.timeout.secs</name> <displayName>topology message timeout (secs)</displayName> <description>default timeout is 60s</description> @@ -172,8 +178,8 @@ <property> <name>dataSinkConfig.metricSinkBatchSize</name> <displayName>Batch Size for Flushing Traffic Metrics</displayName> - <value>1</value> - <description>batch size of flushing metrics </description> + <value>10</value> + <description>batch size of flushing metrics</description> </property> <!-- web app related configurations --> http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml index c5a0e84..bfe43ed 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml +++ b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml @@ -35,6 +35,12 @@ <value>5</value> </property> <property> + <name>topology.message.timeout.secs</name> + <displayName>topology message timeout (secs)</displayName> + <description>default timeout is 60s</description> + <value>60</value> + </property> + <property> <name>topology.numDataFetcherSpout</name> <displayName>Spout Task Number</displayName> <description>spout task number</description> @@ -42,15 +48,15 @@ </property> <property> <name>topology.numEntityPersistBolt</name> - <displayName>Storage Bolt Task Number</displayName> - <description>number of persist tasks to the storage</description> + <displayName>Data Storage Task Number</displayName> + <description>number of persist tasks writing to the storage</description> <value>1</value> </property> <property> <name>topology.numOfKafkaSinkBolt</name> <displayName>Kafka Sink Task Number</displayName> <value>2</value> - <description>number of sinks to alert engine</description> + <description>number of sinks connected to alert engine</description> </property> <property>
