Repository: eagle Updated Branches: refs/heads/master 77fbff720 -> ab50e62ac
[MINOR] update SparkHistoryJobApp & TopologyCheckApp configs 1. add `service.flushLimit` in SparkHistoryJobApp config 2. update TopologyCheckAppConfig.java Author: Zhao, Qingwen <[email protected]> Closes #761 from qingwen220/minor. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/ab50e62a Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/ab50e62a Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/ab50e62a Branch: refs/heads/master Commit: ab50e62acb2483dd009d87b2d68814e13e3e3d92 Parents: 77fbff7 Author: Zhao, Qingwen <[email protected]> Authored: Thu Dec 29 15:05:57 2016 +0800 Committer: Zhao, Qingwen <[email protected]> Committed: Thu Dec 29 15:05:57 2016 +0800 ---------------------------------------------------------------------- .../spark/history/SparkHistoryJobAppConfig.java | 5 ++ .../history/crawl/JHFSparkEventReader.java | 2 +- ...spark.history.SparkHistoryJobAppProvider.xml | 6 +++ .../src/main/resources/application.conf | 1 + .../apache/eagle/topology/TopologyCheckApp.java | 11 ++-- .../eagle/topology/TopologyCheckAppConfig.java | 20 ++++---- ....eagle.topology.TopologyCheckAppProvider.xml | 54 ++++++++++---------- .../src/main/resources/application.conf | 6 +-- .../src/test/resources/application.conf | 10 ++-- 9 files changed, 64 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/ab50e62a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java index adde60b..9646fb1 100644 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java @@ -87,6 +87,10 @@ public class SparkHistoryJobAppConfig implements Serializable { if (config.hasPath("service.basePath")) { this.eagleInfo.basePath = config.getString("service.basePath"); } + this.eagleInfo.flushLimit = 500; + if (config.hasPath("service.flushLimit")) { + this.eagleInfo.flushLimit = config.getInt("service.flushLimit"); + } this.stormConfig.siteId = config.getString("siteId"); this.stormConfig.spoutCrawlInterval = config.getInt("topology.spoutCrawlInterval"); @@ -126,5 +130,6 @@ public class SparkHistoryJobAppConfig implements Serializable { public String password; public String basePath; public int timeout; + public int flushLimit; } } http://git-wip-us.apache.org/repos/asf/eagle/blob/ab50e62a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java index 82e8a41..05e35e4 100644 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java @@ -690,7 +690,7 @@ public class JHFSparkEventReader { private void flushEntities(Collection entities, boolean forceFlush) { this.createEntities.addAll(entities); - if (forceFlush || this.createEntities.size() >= FLUSH_LIMIT) { + if (forceFlush || this.createEntities.size() >= config.eagleInfo.flushLimit) { try { this.doFlush(this.createEntities); this.createEntities.clear(); http://git-wip-us.apache.org/repos/asf/eagle/blob/ab50e62a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml index 17a3a4a..8c0d472 100644 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml @@ -65,6 +65,12 @@ <description>default timeout is 30s</description> <value>300</value> </property> + <property> + <name>service.flushLimit</name> + <displayName>service flushing limit</displayName> + <description>flushing entities limit</description> + <value>500</value> + </property> <!-- datasource config --> <property> http://git-wip-us.apache.org/repos/asf/eagle/blob/ab50e62a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf index 2839915..62f66f6 100644 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf @@ -28,6 +28,7 @@ password : "secret", basePath : "/rest", readTimeOutSeconds : 2 + flushLimit: 500 }, "zookeeper" : { http://git-wip-us.apache.org/repos/asf/eagle/blob/ab50e62a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java index ba5914b..87ff27a 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java @@ -30,7 +30,6 @@ import org.apache.eagle.topology.storm.TopologyDataPersistBolt; public class TopologyCheckApp extends StormApplication { - private static final String SINK_TASK_NUM = "topology.numOfSinkTasks"; private static final String TOPOLOGY_HEALTH_CHECK_STREAM = "topology_health_check_stream"; @Override @@ -41,7 +40,6 @@ public class TopologyCheckApp extends StormApplication { String persistBoltName = TopologyCheckAppConfig.TOPOLOGY_ENTITY_PERSIST_BOLT_NAME; String parseBoltName = TopologyCheckAppConfig.PARSE_BOLT_NAME; String kafkaSinkBoltName = TopologyCheckAppConfig.SINK_BOLT_NAME; - int numOfSinkTasks = config.getInt(SINK_TASK_NUM); TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout( @@ -59,10 +57,15 @@ public class TopologyCheckApp extends StormApplication { topologyBuilder.setBolt( parseBoltName, new HealthCheckParseBolt(), - topologyCheckAppConfig.dataExtractorConfig.numEntityPersistBolt).shuffleGrouping(persistBoltName); + topologyCheckAppConfig.dataExtractorConfig.numEntityPersistBolt + ).setNumTasks(topologyCheckAppConfig.dataExtractorConfig.numEntityPersistBolt).shuffleGrouping(persistBoltName); StormStreamSink<?> sinkBolt = environment.getStreamSink(TOPOLOGY_HEALTH_CHECK_STREAM, config); - topologyBuilder.setBolt(kafkaSinkBoltName, sinkBolt, numOfSinkTasks).setNumTasks(numOfSinkTasks).shuffleGrouping(parseBoltName); + topologyBuilder.setBolt( + kafkaSinkBoltName, + sinkBolt, + topologyCheckAppConfig.dataExtractorConfig.numKafkaSinkBolt + ).setNumTasks(topologyCheckAppConfig.dataExtractorConfig.numKafkaSinkBolt).shuffleGrouping(parseBoltName); return topologyBuilder.createTopology(); } http://git-wip-us.apache.org/repos/asf/eagle/blob/ab50e62a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java index f6d61f6..90a3773 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java @@ -69,14 +69,15 @@ public class TopologyCheckAppConfig implements Serializable { this.config = config; this.dataExtractorConfig.site = config.getString("siteId"); - this.dataExtractorConfig.fetchDataIntervalInSecs = config.getLong("dataExtractorConfig.fetchDataIntervalInSecs"); + this.dataExtractorConfig.fetchDataIntervalInSecs = config.getLong("topology.fetchDataIntervalInSecs"); this.dataExtractorConfig.parseThreadPoolSize = MAX_NUM_THREADS; - if (config.hasPath("dataExtractorConfig.parseThreadPoolSize")) { - this.dataExtractorConfig.parseThreadPoolSize = config.getInt("dataExtractorConfig.parseThreadPoolSize"); + if (config.hasPath("topology.parseThreadPoolSize")) { + this.dataExtractorConfig.parseThreadPoolSize = config.getInt("topology.parseThreadPoolSize"); } - this.dataExtractorConfig.numDataFetcherSpout = config.getInt("dataExtractorConfig.numDataFetcherSpout"); - this.dataExtractorConfig.numEntityPersistBolt = config.getInt("dataExtractorConfig.numEntityPersistBolt"); - String resolveCls = config.getString("dataExtractorConfig.rackResolverCls"); + this.dataExtractorConfig.numDataFetcherSpout = config.getInt("topology.numDataFetcherSpout"); + this.dataExtractorConfig.numEntityPersistBolt = config.getInt("topology.numEntityPersistBolt"); + this.dataExtractorConfig.numKafkaSinkBolt = config.getInt("topology.numOfKafkaSinkBolt"); + String resolveCls = config.getString("topology.rackResolverCls"); try { this.dataExtractorConfig.resolverCls = (Class<? extends TopologyRackResolver>) Class.forName(resolveCls); } catch (ClassNotFoundException e) { @@ -85,7 +86,7 @@ public class TopologyCheckAppConfig implements Serializable { //e.printStackTrace(); } - if (config.hasPath("dataSourceConfig.hbase") && config.getBoolean("dataSourceConfig.hbase.enabled")) { + if (config.hasPath("dataSourceConfig.hbase.enabled") && config.getBoolean("dataSourceConfig.hbase.enabled")) { topologyTypes.add(TopologyConstants.TopologyType.HBASE); hBaseConfig = new HBaseConfig(); @@ -98,14 +99,14 @@ public class TopologyCheckAppConfig implements Serializable { hBaseConfig.hbaseMasterPrincipal = getOptionalConfig("dataSourceConfig.hbase.kerberos.master.principal", null); } - if (config.hasPath("dataSourceConfig.mr") && config.getBoolean("dataSourceConfig.mr.enabled")) { + if (config.hasPath("dataSourceConfig.mr.enabled") && config.getBoolean("dataSourceConfig.mr.enabled")) { topologyTypes.add(TopologyConstants.TopologyType.MR); mrConfig = new MRConfig(); mrConfig.rmUrls = config.getString("dataSourceConfig.mr.rmUrl").split(",\\s*"); mrConfig.historyServerUrl = getOptionalConfig("dataSourceConfig.mr.historyServerUrl", null); } - if (config.hasPath("dataSourceConfig.hdfs") && config.getBoolean("dataSourceConfig.hdfs.enabled")) { + if (config.hasPath("dataSourceConfig.hdfs.enabled") && config.getBoolean("dataSourceConfig.hdfs.enabled")) { topologyTypes.add(TopologyConstants.TopologyType.HDFS); hdfsConfig = new HdfsConfig(); hdfsConfig.namenodeUrls = config.getString("dataSourceConfig.hdfs.namenodeUrl").split(",\\s*"); @@ -116,6 +117,7 @@ public class TopologyCheckAppConfig implements Serializable { public String site; public int numDataFetcherSpout; public int numEntityPersistBolt; + public int numKafkaSinkBolt; public long fetchDataIntervalInSecs; public int parseThreadPoolSize; public Class<? extends TopologyRackResolver> resolverCls; http://git-wip-us.apache.org/repos/asf/eagle/blob/ab50e62a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml index 2089a2f..b4e3695 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml +++ b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml @@ -24,38 +24,51 @@ <configuration> <!-- org.apache.eagle.topology.TopologyCheckApp --> <property> - <name>dataExtractorConfig.fetchDataIntervalInSecs</name> + <name>topology.fetchDataIntervalInSecs</name> <displayName>Fetch Data Interval in Secs</displayName> <description>fetch data interval in secs</description> <value>300</value> </property> <property> - <name>dataExtractorConfig.parseThreadPoolSize</name> + <name>topology.parseThreadPoolSize</name> <displayName>Parser Thread Pool Size</displayName> <description>parser thread pool size</description> <value>5</value> </property> <property> - <name>dataExtractorConfig.numDataFetcherSpout</name> + <name>topology.numDataFetcherSpout</name> <displayName>Spout Task Number</displayName> <description>spout task number</description> <value>1</value> </property> <property> - <name>dataExtractorConfig.numEntityPersistBolt</name> - <displayName>Bolt Task Number</displayName> - <description>bolt task number</description> + <name>topology.numEntityPersistBolt</name> + <displayName>Storage Bolt Task Number</displayName> + <description>number of persist tasks to the storage</description> <value>1</value> </property> + <property> + <name>topology.numOfKafkaSinkBolt</name> + <displayName>Kafka Sink Task Number</displayName> + <value>2</value> + <description>number of sinks to alert engine</description> + </property> <property> - <name>dataExtractorConfig.rackResolverCls</name> + <name>topology.rackResolverCls</name> <displayName>Rack Resolver Class</displayName> <description>rack resolver class</description> <value>org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver</value> </property> <property> + <name>dataSourceConfig.hbase.enabled</name> + <displayName>HBase Topology Check Enabled</displayName> + <description>HBase topology status check enabled</description> + <value>false</value> + <required>true</required> + </property> + <property> <name>dataSourceConfig.hbase.zkQuorum</name> <displayName>HBase Zookeeper Quorum</displayName> <description>hbase zookeeper quorum (optional)</description> @@ -71,13 +84,6 @@ <description>hbase zookeeper client port (optional)</description> </property> - - <property> - <name>dataSourceConfig.hdfs.namenodeUrl</name> - <displayName>Hdfs Namenode Web URL</displayName> - <description>hdfs namenode web url for HDFS monitor</description> - <value>http://sandbox.hortonworks.com:50070</value> - </property> <property> <name>dataSourceConfig.hdfs.enabled</name> <displayName>HDFS Topology Check Enabled</displayName> @@ -86,6 +92,13 @@ <required>true</required> </property> <property> + <name>dataSourceConfig.hdfs.namenodeUrl</name> + <displayName>Hdfs Namenode Web URL</displayName> + <description>hdfs namenode web url for HDFS monitor</description> + <value>http://sandbox.hortonworks.com:50070</value> + </property> + + <property> <name>dataSourceConfig.mr.enabled</name> <displayName>MR Topology Check Enabled</displayName> <description>MR topology status check enabled</description> @@ -104,20 +117,7 @@ <description>URL for history server monitor (optional)</description> <value></value> </property> - <property> - <name>dataSourceConfig.hbase.enabled</name> - <displayName>HBase Topology Check Enabled</displayName> - <description>HBase topology status check enabled</description> - <value>false</value> - <required>true</required> - </property> - <property> - <name>topology.numOfSinkTasks</name> - <displayName>topology.numOfSinkTasks</displayName> - <value>2</value> - <description>number of sink tasks</description> - </property> <!-- data sink configurations --> <property> http://git-wip-us.apache.org/repos/asf/eagle/blob/ab50e62a/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf b/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf index 5d5f1d3..180e57e 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf +++ b/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf @@ -20,15 +20,11 @@ workers : 1, topology : { - "numOfSinkTasks" : 2 - } - - dataExtractorConfig : { - "site": "sandbox", "fetchDataIntervalInSecs": 300, "parseThreadPoolSize": 5, "numDataFetcherSpout" : 1, "numEntityPersistBolt" : 1, + "numOfKafkaSinkBolt": 2, "rackResolverCls" : "org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver" } http://git-wip-us.apache.org/repos/asf/eagle/blob/ab50e62a/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf b/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf index da52f65..c6f17ae 100644 --- a/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf +++ b/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf @@ -18,13 +18,13 @@ mode : "LOCAL", workers : 1, - dataExtractorConfig : { - "site": "sandbox", - "fetchDataIntervalInSecs": 15, + topology : { + "fetchDataIntervalInSecs": 300, "parseThreadPoolSize": 5, - "checkRetryTime" : 3, "numDataFetcherSpout" : 1, - "numEntityPersistBolt" : 1 + "numEntityPersistBolt" : 1, + "numOfKafkaSinkBolt": 2, + "rackResolverCls" : "org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver" } dataSourceConfig : {
