Repository: nutch Updated Branches: refs/heads/master 70622c3e1 -> e53b34b23
Fix for NUTCH-2132: Publisher/Subscriber model for Nutch to emit events, this closes #138 Project: http://git-wip-us.apache.org/repos/asf/nutch/repo Commit: http://git-wip-us.apache.org/repos/asf/nutch/commit/e53b34b2 Tree: http://git-wip-us.apache.org/repos/asf/nutch/tree/e53b34b2 Diff: http://git-wip-us.apache.org/repos/asf/nutch/diff/e53b34b2 Branch: refs/heads/master Commit: e53b34b2322f2d071981a72577644a225642ecbc Parents: 70622c3 Author: Sujen Shah <[email protected]> Authored: Wed Sep 7 09:11:18 2016 -0700 Committer: Sujen Shah <[email protected]> Committed: Wed Sep 7 09:13:11 2016 -0700 ---------------------------------------------------------------------- build.xml | 3 + conf/nutch-default.xml | 57 +++++++ ivy/ivy.xml | 3 + .../org/apache/nutch/fetcher/FetcherThread.java | 37 ++++- .../nutch/fetcher/FetcherThreadEvent.java | 147 +++++++++++++++++++ .../nutch/fetcher/FetcherThreadPublisher.java | 58 ++++++++ src/java/org/apache/nutch/metadata/Nutch.java | 11 ++ .../apache/nutch/publisher/NutchPublisher.java | 46 ++++++ .../apache/nutch/publisher/NutchPublishers.java | 80 ++++++++++ src/plugin/build.xml | 1 + src/plugin/nutch-extensionpoints/plugin.xml | 4 + src/plugin/publish-rabbitmq/build-ivy.xml | 54 +++++++ src/plugin/publish-rabbitmq/build.xml | 27 ++++ src/plugin/publish-rabbitmq/ivy.xml | 42 ++++++ src/plugin/publish-rabbitmq/plugin.xml | 43 ++++++ .../rabbitmq/RabbitMQPublisherImpl.java | 92 ++++++++++++ .../nutch/publisher/rabbitmq/package-info.java | 22 +++ 17 files changed, 725 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index 0ee60a1..8c9e778 100644 --- a/build.xml +++ b/build.xml @@ -209,6 +209,7 @@ <packageset dir="${plugins.dir}/protocol-httpclient/src/java"/> <packageset dir="${plugins.dir}/protocol-interactiveselenium/src/java"/> <packageset dir="${plugins.dir}/protocol-selenium/src/java"/> + <packageset dir="${plugins.dir}/publish-rabbitmq/src/java"/> <packageset dir="${plugins.dir}/scoring-depth/src/java"/> <packageset dir="${plugins.dir}/scoring-similarity/src/java"/> <packageset dir="${plugins.dir}/scoring-link/src/java"/> @@ -652,6 +653,7 @@ <packageset dir="${plugins.dir}/protocol-http/src/java"/> <packageset dir="${plugins.dir}/protocol-httpclient/src/java"/> <packageset dir="${plugins.dir}/protocol-selenium/src/java"/> + <packageset dir="${plugins.dir}/publish-rabbitmq/src/java"/> <packageset dir="${plugins.dir}/scoring-depth/src/java"/> <packageset dir="${plugins.dir}/scoring-similarity/src/java"/> <packageset dir="${plugins.dir}/scoring-link/src/java"/> @@ -1073,6 +1075,7 @@ <source path="${plugins.dir}/protocol-http/src/java/" /> <source path="${plugins.dir}/protocol-http/src/test/" /> <source path="${plugins.dir}/protocol-selenium/src/java"/> + <source path="${plugins.dir}/publish-rabbitmq/src/java"/> <source path="${plugins.dir}/scoring-depth/src/java/" /> <source path="${plugins.dir}/scoring-similarity/src/java/" /> <source path="${plugins.dir}/scoring-link/src/java/" /> http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/conf/nutch-default.xml ---------------------------------------------------------------------- diff --git a/conf/nutch-default.xml b/conf/nutch-default.xml index ec9d2d4..ea7df89 100644 --- a/conf/nutch-default.xml +++ b/conf/nutch-default.xml @@ -983,6 +983,7 @@ </property> <property> + <name>fetcher.store.robotstxt</name> <value>false</value> <description>If true (and fetcher.store.content is also true), @@ -992,6 +993,13 @@ </description> </property> +<property> + <name>fetcher.publisher</name> + <value>false</value> + <description>Set this value to true if you want to use an implementation of the Publisher/Subscriber model. Make sure to set corresponding + Publisher implementation specific properties</description> +</property> + <!-- moreindexingfilter plugin properties --> <property> @@ -2253,4 +2261,53 @@ visit https://wiki.apache.org/nutch/SimilarityScoringFilter--> </description> </property> +<!-- publisher properties + Do not forget to add the name of your publisher implementation + in plugin.includes ex- publish-rabbitmq --> +<property> + <name>publisher.queue.type</name> + <value></value> + <description> + Choose the type of Queue being used (ex - RabbitMQ, ActiveMq, Kafka, etc). + Currently there exists an implemtation for RabbitMQ producer. + </description> +</property> +<property> + <name>publisher.order</name> + <value></value> + <description> + The order in which the publisher queues would be loaded + </description> +</property> +<!-- RabbitMQ properties --> +<property> + <name>rabbitmq.exchange.server</name> + <value></value> + <description> + Name for the exchange server to use. Default - "fetcher_log" + </description> +</property> +<property> + <name>rabbitmq.exchange.type</name> + <value></value> + <description> + There are a few exchange types available: direct, topic, headers and fanout. Default "fanout". + </description> +</property> +<property> + <name>rabbitmq.host</name> + <value></value> + <description> + Host on which the RabbitMQ server is running. Default "localhost". + </description> +</property> +<property> + <name>rabbitmq.queue.routingkey</name> + <value></value> + <description> + The routingKey used by publisher to publish messages to specific queues. If the exchange type is "fanout", then this property is ignored. + </description> +</property> + + </configuration> http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/ivy/ivy.xml ---------------------------------------------------------------------- diff --git a/ivy/ivy.xml b/ivy/ivy.xml index a9a83ae..adc7e91 100644 --- a/ivy/ivy.xml +++ b/ivy/ivy.xml @@ -127,6 +127,9 @@ <dependency org="org.apache.wicket" name="wicket-spring" rev="6.16.0" conf="*->default" /> <dependency org="de.agilecoders.wicket" name="wicket-bootstrap-core" rev="0.9.2" conf="*->default" /> <dependency org="de.agilecoders.wicket" name="wicket-bootstrap-extensions" rev="0.9.2" conf="*->default" /> + + <!-- RabbitMQ dependencies --> + <dependency org="com.rabbitmq" name="amqp-client" rev="3.6.5" conf="*->default" /> <!--global exclusion --> <exclude module="jmxtools" /> http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/src/java/org/apache/nutch/fetcher/FetcherThread.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/nutch/fetcher/FetcherThread.java b/src/java/org/apache/nutch/fetcher/FetcherThread.java index 449e220..5a1425d 100644 --- a/src/java/org/apache/nutch/fetcher/FetcherThread.java +++ b/src/java/org/apache/nutch/fetcher/FetcherThread.java @@ -36,6 +36,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.NutchWritable; import org.apache.nutch.crawl.SignatureFactory; +import org.apache.nutch.fetcher.FetcherThreadEvent.PublishEventType; import org.apache.nutch.metadata.Metadata; import org.apache.nutch.metadata.Nutch; import org.apache.nutch.net.URLExemptionFilters; @@ -135,6 +136,10 @@ public class FetcherThread extends Thread { //Used by the REST service private FetchNode fetchNode; private boolean reportToNutchServer; + + //Used for publishing events + private FetcherThreadPublisher publisher; + private boolean activatePublisher; public FetcherThread(Configuration conf, AtomicInteger activeThreads, FetchItemQueues fetchQueues, QueueFeeder feeder, AtomicInteger spinWaiting, AtomicLong lastRequestStart, Reporter reporter, @@ -164,6 +169,10 @@ public class FetcherThread extends Thread { this.storingContent = storingContent; this.pages = pages; this.bytes = bytes; + + if((activatePublisher=conf.getBoolean("fetcher.publisher", false))) + this.publisher = new FetcherThreadPublisher(conf); + queueMode = conf.get("fetcher.queue.mode", FetchItemQueues.QUEUE_MODE_HOST); // check that the mode is known @@ -254,6 +263,13 @@ public class FetcherThread extends Thread { // fetch the page redirecting = false; redirectCount = 0; + + //Publisher event + if(activatePublisher) { + FetcherThreadEvent startEvent = new FetcherThreadEvent(PublishEventType.START, fit.getUrl().toString()); + publisher.publish(startEvent, conf); + } + do { if (LOG.isInfoEnabled()) { LOG.info("fetching " + fit.url + " (queue crawl delay=" @@ -322,7 +338,13 @@ public class FetcherThread extends Thread { fetchNode.setFetchTime(System.currentTimeMillis()); fetchNode.setUrl(fit.url); } - + + //Publish fetch finish event + if(activatePublisher) { + FetcherThreadEvent endEvent = new FetcherThreadEvent(PublishEventType.END, fit.getUrl().toString()); + endEvent.addEventData("status", status.getName()); + publisher.publish(endEvent, conf); + } reporter.incrCounter("FetcherStatus", status.getName(), 1); switch (status.getCode()) { @@ -688,7 +710,18 @@ public class FetcherThread extends Thread { outlinkList.add(links[i]); outlinks.add(toUrl); } - + + //Publish fetch report event + if(activatePublisher) { + FetcherThreadEvent reportEvent = new FetcherThreadEvent(PublishEventType.REPORT, url.toString()); + reportEvent.addOutlinksToEventData(outlinkList); + reportEvent.addEventData(Nutch.FETCH_EVENT_TITLE, parseData.getTitle()); + reportEvent.addEventData(Nutch.FETCH_EVENT_CONTENTTYPE, parseData.getContentMeta().get("content-type")); + reportEvent.addEventData(Nutch.FETCH_EVENT_SCORE, datum.getScore()); + reportEvent.addEventData(Nutch.FETCH_EVENT_FETCHTIME, datum.getFetchTime()); + reportEvent.addEventData(Nutch.FETCH_EVENT_CONTENTLANG, parseData.getContentMeta().get("content-language")); + publisher.publish(reportEvent, conf); + } // Only process depth N outlinks if (maxOutlinkDepth > 0 && outlinkDepth < maxOutlinkDepth) { reporter.incrCounter("FetcherOutlinks", "outlinks_detected", http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/src/java/org/apache/nutch/fetcher/FetcherThreadEvent.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/nutch/fetcher/FetcherThreadEvent.java b/src/java/org/apache/nutch/fetcher/FetcherThreadEvent.java new file mode 100644 index 0000000..c5028cd --- /dev/null +++ b/src/java/org/apache/nutch/fetcher/FetcherThreadEvent.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.fetcher; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nutch.metadata.Nutch; +import org.apache.nutch.parse.Outlink; + +/** + * This class is used to capture the various events occurring + * at fetch time. These events are sent to a queue implementing the publisher + * + */ +public class FetcherThreadEvent implements Serializable{ + + /** Type of event to specify start, end or reporting of a fetch item. **/ + public static enum PublishEventType {START, END, REPORT} + + private PublishEventType eventType; + private Map<String, Object> eventData; + private String url; + private Long timestamp; + + /** + * Constructor to create an event to be published + * @param eventType Type of {@link #eventType event} being created + * @param url URL of the fetched page to which this event belongs to + */ + public FetcherThreadEvent(PublishEventType eventType, String url) { + this.eventType = eventType; + this.url = url; + this.timestamp = System.currentTimeMillis(); + } + + /** + * Get type of this event object + * @return {@link PublishEventType Event} type + */ + public PublishEventType getEventType() { + return eventType; + } + + /** + * Set event type of this object + * @param eventType Set {@link #eventType event} type + */ + public void setEventType(PublishEventType eventType) { + this.eventType = eventType; + } + + /** + * Get event data + * @return + */ + public Map<String, Object> getEventData() { + return eventData; + } + /** + * Set metadata to this even + * @param eventData A map containing important information relevant + * to this event (fetched page). + * Ex - score, title, outlinks, content-type, etc + */ + public void setEventData(Map<String, Object> eventData) { + this.eventData = eventData; + } + + /** + * Get URL of this event + * @return {@link #url URL} of this event + */ + public String getUrl() { + return url; + } + + /** + * Set URL of this event (fetched page) + * @param url URL of the fetched page + */ + public void setUrl(String url) { + this.url = url; + } + /** + * Add new data to the eventData object. + * @param key A key to refer to the data being added to this event + * @param value Data to be stored in the event referenced by the above key + */ + public void addEventData(String key, Object value) { + if(eventData == null) { + eventData = new HashMap<String, Object>(); + } + eventData.put(key, value); + } + + /** + * Given a collection of lists this method will add it + * the oultink metadata + * @param links A collection of outlinks generating from the fetched page + * this event refers to + */ + public void addOutlinksToEventData(Collection<Outlink> links) { + ArrayList<Map<String, String>> outlinkList = new ArrayList<>(); + for(Outlink link: links) { + Map<String, String> outlink = new HashMap<>(); + outlink.put("url", link.getToUrl()); + outlink.put("anchor", link.getAnchor()); + outlinkList.add(outlink); + } + this.addEventData("outlinks", outlinkList); + } + + /** + * Get timestamp of current event. + * @return {@link #timestamp Timestamp} + */ + public Long getTimestamp() { + return timestamp; + } + + /** + * Set timestamp for this event + * @param timestamp Timestamp of the occurrence of this event + */ + public void setTimestamp(Long timestamp) { + this.timestamp = timestamp; + } + +} http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/src/java/org/apache/nutch/fetcher/FetcherThreadPublisher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/nutch/fetcher/FetcherThreadPublisher.java b/src/java/org/apache/nutch/fetcher/FetcherThreadPublisher.java new file mode 100644 index 0000000..2832017 --- /dev/null +++ b/src/java/org/apache/nutch/fetcher/FetcherThreadPublisher.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.fetcher; + +import org.apache.hadoop.conf.Configuration; +import org.apache.nutch.publisher.NutchPublishers; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class handles the publishing of the events to the queue implementation. + * + */ +public class FetcherThreadPublisher { + + private static NutchPublishers publisher; + private static final Logger LOG = LoggerFactory.getLogger(FetcherThreadPublisher.class); + + /** + * Configure all registered publishers + * @param conf {@link org.apache.hadoop.conf.Configuration Configuration} to be used + */ + public FetcherThreadPublisher(Configuration conf) { + LOG.info("Setting up publishers"); + publisher = new NutchPublishers(conf); + if(!publisher.setConfig(conf)) + publisher = null; + } + + /** + * Publish event to all registered publishers + * @param event {@link org.apache.nutch.fetcher.FetcherThreadEvent Event} to be published + * @param conf {@link org.apache.hadoop.conf.Configuration Configuration} to be used + */ + public void publish(FetcherThreadEvent event, Configuration conf) { + if(publisher!=null) { + publisher.publish(event, conf); + } + else { + LOG.warn("Could not instantiate publisher implementation, continuing without publishing"); + } + } + +} http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/src/java/org/apache/nutch/metadata/Nutch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/nutch/metadata/Nutch.java b/src/java/org/apache/nutch/metadata/Nutch.java index cbc3317..7ad0b5e 100644 --- a/src/java/org/apache/nutch/metadata/Nutch.java +++ b/src/java/org/apache/nutch/metadata/Nutch.java @@ -97,4 +97,15 @@ public interface Nutch { public static final String ARG_SEGMENTDIR = "segment_dir"; /** Argument key to specify the location of individual segment for the REST endpoints **/ public static final String ARG_SEGMENT = "segment"; + + /** Title key in the Pub/Sub event metadata for the title of the parsed page*/ + public static final String FETCH_EVENT_TITLE = "title"; + /** Content-type key in the Pub/Sub event metadata for the content-type of the parsed page*/ + public static final String FETCH_EVENT_CONTENTTYPE = "content-type"; + /** Score key in the Pub/Sub event metadata for the score of the parsed page*/ + public static final String FETCH_EVENT_SCORE = "score"; + /** Fetch time key in the Pub/Sub event metadata for the fetch time of the parsed page*/ + public static final String FETCH_EVENT_FETCHTIME = "fetchTime"; + /** Content-lanueage key in the Pub/Sub event metadata for the content-language of the parsed page*/ + public static final String FETCH_EVENT_CONTENTLANG = "content-language"; } http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/src/java/org/apache/nutch/publisher/NutchPublisher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/nutch/publisher/NutchPublisher.java b/src/java/org/apache/nutch/publisher/NutchPublisher.java new file mode 100644 index 0000000..75281ad --- /dev/null +++ b/src/java/org/apache/nutch/publisher/NutchPublisher.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.publisher; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.nutch.plugin.Pluggable; + +/** + * All publisher subscriber model implementations should implement this interface. + * + */ +public interface NutchPublisher extends Configurable, Pluggable { + + + public final static String X_POINT_ID = NutchPublisher.class.getName(); + + /** + * Use implementation specific configurations + * @param conf {@link org.apache.hadoop.conf.Configuration Configuration} to be used + */ + public boolean setConfig(Configuration conf); + + /** + * This method publishes the event. Make sure that the event is a Java POJO to avoid + * Jackson JSON conversion errors. Currently we use the FetcherThreadEvent + * @param event + */ + public void publish(Object event, Configuration conf); + + +} http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/src/java/org/apache/nutch/publisher/NutchPublishers.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/nutch/publisher/NutchPublishers.java b/src/java/org/apache/nutch/publisher/NutchPublishers.java new file mode 100644 index 0000000..b79c217 --- /dev/null +++ b/src/java/org/apache/nutch/publisher/NutchPublishers.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.publisher; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.nutch.plugin.PluginRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NutchPublishers extends Configured implements NutchPublisher{ + + private static final Logger LOG = LoggerFactory.getLogger(NutchPublishers.class); + private NutchPublisher[] publishers; + private Configuration conf; + + public NutchPublishers(Configuration conf) { + this.conf = conf; + this.publishers = (NutchPublisher[])PluginRepository.get(conf). + getOrderedPlugins(NutchPublisher.class, + NutchPublisher.X_POINT_ID, "publisher.order"); + } + + @Override + public boolean setConfig(Configuration conf) { + boolean success = false; + try { + for(int i=0; i<this.publishers.length; i++) { + success |= this.publishers[i].setConfig(conf); + if(success) + LOG.info("Successfully loaded {} publisher", + this.publishers[i].getClass().getName()); + } + }catch(Exception e) { + LOG.warn("Error while loading publishers : {}", e.getMessage()); + } + if(!success) { + LOG.warn("Could not load any publishers out of {} publishers", + this.publishers.length); + } + return success; + } + + @Override + public void publish(Object event, Configuration conf) { + for(int i=0; i<this.publishers.length; i++) { + try{ + this.publishers[i].publish(event, conf); + }catch(Exception e){ + LOG.warn("Could not post event to {}", + this.publishers[i].getClass().getName()); + } + } + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration arg0) { + + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/src/plugin/build.xml ---------------------------------------------------------------------- diff --git a/src/plugin/build.xml b/src/plugin/build.xml index 20ef870..1cd2743 100755 --- a/src/plugin/build.xml +++ b/src/plugin/build.xml @@ -65,6 +65,7 @@ <ant dir="parse-swf" target="deploy"/> <ant dir="parse-tika" target="deploy"/> <ant dir="parse-zip" target="deploy"/> + <ant dir="publish-rabbitmq" target="deploy"/> <ant dir="scoring-depth" target="deploy"/> <ant dir="scoring-opic" target="deploy"/> <ant dir="scoring-link" target="deploy"/> http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/src/plugin/nutch-extensionpoints/plugin.xml ---------------------------------------------------------------------- diff --git a/src/plugin/nutch-extensionpoints/plugin.xml b/src/plugin/nutch-extensionpoints/plugin.xml index 8cf7a23..b6a2e9d 100644 --- a/src/plugin/nutch-extensionpoints/plugin.xml +++ b/src/plugin/nutch-extensionpoints/plugin.xml @@ -64,4 +64,8 @@ id="org.apache.nutch.segment.SegmentMergeFilter" name="Nutch Segment Merge Filter"/> +<extension-point + id="org.apache.nutch.publisher.NutchPublisher" + name="Nutch Publisher"/> + </plugin> http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/src/plugin/publish-rabbitmq/build-ivy.xml ---------------------------------------------------------------------- diff --git a/src/plugin/publish-rabbitmq/build-ivy.xml b/src/plugin/publish-rabbitmq/build-ivy.xml new file mode 100644 index 0000000..683e757 --- /dev/null +++ b/src/plugin/publish-rabbitmq/build-ivy.xml @@ -0,0 +1,54 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project name="publish-rabbitmq" default="deps-jar" xmlns:ivy="antlib:org.apache.ivy.ant"> + + <property name="ivy.install.version" value="2.1.0" /> + <condition property="ivy.home" value="${env.IVY_HOME}"> + <isset property="env.IVY_HOME" /> + </condition> + <property name="ivy.home" value="${user.home}/.ant" /> + <property name="ivy.checksums" value="" /> + <property name="ivy.jar.dir" value="${ivy.home}/lib" /> + <property name="ivy.jar.file" value="${ivy.jar.dir}/ivy.jar" /> + + <target name="download-ivy" unless="offline"> + + <mkdir dir="${ivy.jar.dir}"/> + <!-- download Ivy from web site so that it can be used even without any special installation --> + <get src="http://repo2.maven.org/maven2/org/apache/ivy/ivy/${ivy.install.version}/ivy-${ivy.install.version}.jar" + dest="${ivy.jar.file}" usetimestamp="true"/> + </target> + + <target name="init-ivy" depends="download-ivy"> + <!-- try to load ivy here from ivy home, in case the user has not already dropped + it into ant's lib dir (note that the latter copy will always take precedence). + We will not fail as long as local lib dir exists (it may be empty) and + ivy is in at least one of ant's lib dir or the local lib dir. --> + <path id="ivy.lib.path"> + <fileset dir="${ivy.jar.dir}" includes="*.jar"/> + + </path> + <taskdef resource="org/apache/ivy/ant/antlib.xml" + uri="antlib:org.apache.ivy.ant" classpathref="ivy.lib.path"/> + </target> + + <target name="deps-jar" depends="init-ivy"> + <ivy:retrieve pattern="lib/[artifact]-[revision].[ext]"/> + </target> + +</project> http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/src/plugin/publish-rabbitmq/build.xml ---------------------------------------------------------------------- diff --git a/src/plugin/publish-rabbitmq/build.xml b/src/plugin/publish-rabbitmq/build.xml new file mode 100644 index 0000000..9b48aa0 --- /dev/null +++ b/src/plugin/publish-rabbitmq/build.xml @@ -0,0 +1,27 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project name="publish-rabbitmq" default="jar-core"> + + <import file="../build-plugin.xml"/> + + <!-- Deploy Unit test dependencies --> + <target name="deps-test"> + <ant target="deploy" inheritall="false" dir="../nutch-extensionpoints"/> + </target> + +</project> http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/src/plugin/publish-rabbitmq/ivy.xml ---------------------------------------------------------------------- diff --git a/src/plugin/publish-rabbitmq/ivy.xml b/src/plugin/publish-rabbitmq/ivy.xml new file mode 100644 index 0000000..589786f --- /dev/null +++ b/src/plugin/publish-rabbitmq/ivy.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" ?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<ivy-module version="1.0"> + <info organisation="org.apache.nutch" module="${ant.project.name}"> + <license name="Apache 2.0"/> + <ivyauthor name="Apache Nutch Team" url="http://nutch.apache.org"/> + <description> + Apache Nutch + </description> + </info> + + <configurations> + <include file="../../..//ivy/ivy-configurations.xml"/> + </configurations> + + <publications> + <!--get the artifact from our module name--> + <artifact conf="master"/> + </publications> + + <dependencies> + <dependency org="com.rabbitmq" name="amqp-client" rev="3.6.5" conf="*->default" /> + </dependencies> + +</ivy-module> http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/src/plugin/publish-rabbitmq/plugin.xml ---------------------------------------------------------------------- diff --git a/src/plugin/publish-rabbitmq/plugin.xml b/src/plugin/publish-rabbitmq/plugin.xml new file mode 100644 index 0000000..56c2d8b --- /dev/null +++ b/src/plugin/publish-rabbitmq/plugin.xml @@ -0,0 +1,43 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<plugin + id="publish-rabbitmq" + name="RabbitMQ implementation" + version="1.0.0" + provider-name="nutch.org"> + + + <runtime> + <library name="publish-rabbitmq.jar"> + <export name="*"/> + </library> + </runtime> + + <requires> + <import plugin="nutch-extensionpoints"/> + </requires> + + <extension id="org.apache.nutch.publisher.rabbitmq" + name="RabbitMQPublisherImpl" + point="org.apache.nutch.publisher.NutchPublisher"> + + <implementation id="RabbitMQPublisherImpl" + class="org.apache.nutch.publisher.rabbitmq.RabbitMQPublisherImpl" /> + </extension> + +</plugin> http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/src/plugin/publish-rabbitmq/src/java/org/apache/nutch/publisher/rabbitmq/RabbitMQPublisherImpl.java ---------------------------------------------------------------------- diff --git a/src/plugin/publish-rabbitmq/src/java/org/apache/nutch/publisher/rabbitmq/RabbitMQPublisherImpl.java b/src/plugin/publish-rabbitmq/src/java/org/apache/nutch/publisher/rabbitmq/RabbitMQPublisherImpl.java new file mode 100644 index 0000000..3fafbaf --- /dev/null +++ b/src/plugin/publish-rabbitmq/src/java/org/apache/nutch/publisher/rabbitmq/RabbitMQPublisherImpl.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.publisher.rabbitmq; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.StringUtils; +import org.apache.nutch.publisher.NutchPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +public class RabbitMQPublisherImpl implements NutchPublisher{ + + private static String EXCHANGE_SERVER; + private static String EXCHANGE_TYPE; + private static String HOST; + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQPublisherImpl.class); + private static Channel channel; + + @Override + public boolean setConfig(Configuration conf) { + try{ + EXCHANGE_SERVER = conf.get("rabbitmq.exchange.server", "fetcher_log"); + EXCHANGE_TYPE = conf.get("rabbitmq.exchange.type", "fanout"); + HOST = conf.get("rabbitmq.host", "localhost"); + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(HOST); + + Connection connection = factory.newConnection(); + channel = connection.createChannel(); + channel.exchangeDeclare(EXCHANGE_SERVER, EXCHANGE_TYPE); + LOG.info("Configured RabbitMQ publisher"); + return true; + }catch(Exception e) { + LOG.error("Could not initialize RabbitMQ publisher - {}", StringUtils.stringifyException(e)); + return false; + } + + } + + @Override + public void publish(Object event, Configuration conf) { + String rountingKey = conf.get("rabbitmq.queue.routingkey", ""); + try { + channel.basicPublish(EXCHANGE_SERVER, rountingKey, null, getJSONString(event).getBytes()); + } catch (Exception e) { + LOG.error("Error occured while publishing - {}", StringUtils.stringifyException(e)); + } + } + + private String getJSONString(Object obj) { + ObjectMapper mapper = new ObjectMapper(); + try { + return mapper.writeValueAsString(obj); + } catch (JsonProcessingException e) { + LOG.error("Error converting event object to JSON String - {}", StringUtils.stringifyException(e)); + } + return null; + } + + + @Override + public void setConf(Configuration arg0) { + + } + + @Override + public Configuration getConf() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/src/plugin/publish-rabbitmq/src/java/org/apache/nutch/publisher/rabbitmq/package-info.java ---------------------------------------------------------------------- diff --git a/src/plugin/publish-rabbitmq/src/java/org/apache/nutch/publisher/rabbitmq/package-info.java b/src/plugin/publish-rabbitmq/src/java/org/apache/nutch/publisher/rabbitmq/package-info.java new file mode 100644 index 0000000..9f49c7f --- /dev/null +++ b/src/plugin/publish-rabbitmq/src/java/org/apache/nutch/publisher/rabbitmq/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Publisher package to implement queues + */ +package org.apache.nutch.publisher.rabbitmq; +
