Repository: incubator-eagle Updated Branches: refs/heads/master e24de5c7e -> 229d7b907
[MINOR] add stream data source config for mr history job Author: wujinhu <wujinhu...@126.com> Closes #748 from wujinhu/EAGLE-796. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/229d7b90 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/229d7b90 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/229d7b90 Branch: refs/heads/master Commit: 229d7b9073a430de7dbbb12d39ddc330646eb458 Parents: e24de5c Author: wujinhu <wujinhu...@126.com> Authored: Fri Dec 16 14:55:47 2016 +0800 Committer: wujinhu <wujinhu...@126.com> Committed: Fri Dec 16 14:55:47 2016 +0800 ---------------------------------------------------------------------- .../apache/eagle/jpm/mr/history/storm/JobHistorySpout.java | 9 ++++++--- ...eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml | 9 ++++++++- 2 files changed, 14 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/229d7b90/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java index 0cd30ae..d7daa5e 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java @@ -140,7 +140,12 @@ public class JobHistorySpout extends BaseRichSpout { JobHistoryZKStateManager.instance().init(appConfig.getZkStateConfig()); JobHistoryZKStateManager.instance().ensureJobPartition(partitionId, numTotalPartitions); interceptor.setSpoutOutputCollector(collector); - + if (streamPublishers != null) { + for (StreamPublisher streamPublisher : streamPublishers) { + streamPublisher.setCollector(this.interceptor); + StreamPublisherManager.getInstance().addStreamPublisher(streamPublisher); + } + } try { jhfLCM = new JobHistoryDAOImpl(jobHistoryEndpointConfig); driver = new JHFCrawlerDriverImpl( @@ -189,8 +194,6 @@ public class JobHistorySpout extends BaseRichSpout { if (streamPublishers != null) { for (StreamPublisher streamPublisher : streamPublishers) { declarer.declareStream(streamPublisher.stormStreamId(), new Fields("f1", "message")); - streamPublisher.setCollector(this.interceptor); - StreamPublisherManager.getInstance().addStreamPublisher(streamPublisher); } } else { declarer.declare(new Fields("f1", "message")); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/229d7b90/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml index ccf3c6b..1ff9b85 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml @@ -96,6 +96,13 @@ <required>true</required> </property> <property> + <name>dataSourceConfig.zkConnection</name> + <displayName>Kafka Zookeeper Quorum</displayName> + <value>localhost:2181</value> + <description>kafka zookeeper connection</description> + <required>true</required> + </property> + <property> <name>dataSinkConfig.serializerClass</name> <displayName>Serializer Class For Kafka Message Value</displayName> <value>kafka.serializer.StringEncoder</value> @@ -221,7 +228,7 @@ <type>string</type> </column> <column> - <name>queue</name> + <name>hostname</name> <type>string</type> </column> <column>