Repository: nutch
Updated Branches:
  refs/heads/master 70622c3e1 -> e53b34b23


Fix for NUTCH-2132: Publisher/Subscriber model for Nutch to emit events, this 
closes #138


Project: http://git-wip-us.apache.org/repos/asf/nutch/repo
Commit: http://git-wip-us.apache.org/repos/asf/nutch/commit/e53b34b2
Tree: http://git-wip-us.apache.org/repos/asf/nutch/tree/e53b34b2
Diff: http://git-wip-us.apache.org/repos/asf/nutch/diff/e53b34b2

Branch: refs/heads/master
Commit: e53b34b2322f2d071981a72577644a225642ecbc
Parents: 70622c3
Author: Sujen Shah <[email protected]>
Authored: Wed Sep 7 09:11:18 2016 -0700
Committer: Sujen Shah <[email protected]>
Committed: Wed Sep 7 09:13:11 2016 -0700

----------------------------------------------------------------------
 build.xml                                       |   3 +
 conf/nutch-default.xml                          |  57 +++++++
 ivy/ivy.xml                                     |   3 +
 .../org/apache/nutch/fetcher/FetcherThread.java |  37 ++++-
 .../nutch/fetcher/FetcherThreadEvent.java       | 147 +++++++++++++++++++
 .../nutch/fetcher/FetcherThreadPublisher.java   |  58 ++++++++
 src/java/org/apache/nutch/metadata/Nutch.java   |  11 ++
 .../apache/nutch/publisher/NutchPublisher.java  |  46 ++++++
 .../apache/nutch/publisher/NutchPublishers.java |  80 ++++++++++
 src/plugin/build.xml                            |   1 +
 src/plugin/nutch-extensionpoints/plugin.xml     |   4 +
 src/plugin/publish-rabbitmq/build-ivy.xml       |  54 +++++++
 src/plugin/publish-rabbitmq/build.xml           |  27 ++++
 src/plugin/publish-rabbitmq/ivy.xml             |  42 ++++++
 src/plugin/publish-rabbitmq/plugin.xml          |  43 ++++++
 .../rabbitmq/RabbitMQPublisherImpl.java         |  92 ++++++++++++
 .../nutch/publisher/rabbitmq/package-info.java  |  22 +++
 17 files changed, 725 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 0ee60a1..8c9e778 100644
--- a/build.xml
+++ b/build.xml
@@ -209,6 +209,7 @@
       <packageset dir="${plugins.dir}/protocol-httpclient/src/java"/>
       <packageset dir="${plugins.dir}/protocol-interactiveselenium/src/java"/>
       <packageset dir="${plugins.dir}/protocol-selenium/src/java"/>
+      <packageset dir="${plugins.dir}/publish-rabbitmq/src/java"/>
       <packageset dir="${plugins.dir}/scoring-depth/src/java"/>
       <packageset dir="${plugins.dir}/scoring-similarity/src/java"/>
       <packageset dir="${plugins.dir}/scoring-link/src/java"/>
@@ -652,6 +653,7 @@
       <packageset dir="${plugins.dir}/protocol-http/src/java"/>
       <packageset dir="${plugins.dir}/protocol-httpclient/src/java"/>
       <packageset dir="${plugins.dir}/protocol-selenium/src/java"/>
+      <packageset dir="${plugins.dir}/publish-rabbitmq/src/java"/>
       <packageset dir="${plugins.dir}/scoring-depth/src/java"/>
       <packageset dir="${plugins.dir}/scoring-similarity/src/java"/>
       <packageset dir="${plugins.dir}/scoring-link/src/java"/>
@@ -1073,6 +1075,7 @@
         <source path="${plugins.dir}/protocol-http/src/java/" />
         <source path="${plugins.dir}/protocol-http/src/test/" />
         <source path="${plugins.dir}/protocol-selenium/src/java"/>
+        <source path="${plugins.dir}/publish-rabbitmq/src/java"/>
         <source path="${plugins.dir}/scoring-depth/src/java/" />
         <source path="${plugins.dir}/scoring-similarity/src/java/" />
         <source path="${plugins.dir}/scoring-link/src/java/" />

http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/conf/nutch-default.xml
----------------------------------------------------------------------
diff --git a/conf/nutch-default.xml b/conf/nutch-default.xml
index ec9d2d4..ea7df89 100644
--- a/conf/nutch-default.xml
+++ b/conf/nutch-default.xml
@@ -983,6 +983,7 @@
 </property>
 
 <property>
+
   <name>fetcher.store.robotstxt</name>
   <value>false</value>
   <description>If true (and fetcher.store.content is also true),
@@ -992,6 +993,13 @@
   </description>
 </property>
 
+<property>
+       <name>fetcher.publisher</name>
+       <value>false</value>
+       <description>Set this value to true if you want to use an 
implementation of the Publisher/Subscriber model. Make sure to set corresponding
+       Publisher implementation specific properties</description>
+</property> 
+
 <!-- moreindexingfilter plugin properties -->
 
 <property>
@@ -2253,4 +2261,53 @@ visit 
https://wiki.apache.org/nutch/SimilarityScoringFilter-->
   </description>
 </property>
 
+<!-- publisher properties 
+      Do not forget to add the name of your publisher implementation 
+      in plugin.includes ex- publish-rabbitmq -->
+<property>
+  <name>publisher.queue.type</name>
+  <value></value>
+  <description>
+    Choose the type of Queue being used (ex - RabbitMQ, ActiveMq, Kafka, etc). 
+    Currently there exists an implemtation for RabbitMQ producer. 
+  </description>
+</property>
+<property>
+  <name>publisher.order</name>
+  <value></value>
+  <description>
+    The order in which the publisher queues would be loaded
+  </description>
+</property>
+<!-- RabbitMQ properties -->
+<property>
+  <name>rabbitmq.exchange.server</name>
+  <value></value>
+  <description>
+    Name for the exchange server to use. Default - "fetcher_log"
+  </description>
+</property>
+<property>
+  <name>rabbitmq.exchange.type</name>
+  <value></value>
+  <description>
+    There are a few exchange types available: direct, topic, headers and 
fanout. Default "fanout".
+  </description>
+</property>
+<property>
+  <name>rabbitmq.host</name>
+  <value></value>
+  <description>
+    Host on which the RabbitMQ server is running. Default "localhost".
+  </description>
+</property>
+<property>
+  <name>rabbitmq.queue.routingkey</name>
+  <value></value>
+  <description>
+    The routingKey used by publisher to publish messages to specific queues. 
If the exchange type is "fanout", then this property is ignored. 
+  </description>
+</property>
+
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/ivy/ivy.xml
----------------------------------------------------------------------
diff --git a/ivy/ivy.xml b/ivy/ivy.xml
index a9a83ae..adc7e91 100644
--- a/ivy/ivy.xml
+++ b/ivy/ivy.xml
@@ -127,6 +127,9 @@
        <dependency org="org.apache.wicket" name="wicket-spring" rev="6.16.0" 
conf="*->default" />
        <dependency org="de.agilecoders.wicket" name="wicket-bootstrap-core" 
rev="0.9.2" conf="*->default" />
        <dependency org="de.agilecoders.wicket" 
name="wicket-bootstrap-extensions" rev="0.9.2" conf="*->default" />
+               
+       <!-- RabbitMQ dependencies -->
+       <dependency org="com.rabbitmq" name="amqp-client" rev="3.6.5" 
conf="*->default" />
 
                <!--global exclusion -->
                <exclude module="jmxtools" />

http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/src/java/org/apache/nutch/fetcher/FetcherThread.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/nutch/fetcher/FetcherThread.java 
b/src/java/org/apache/nutch/fetcher/FetcherThread.java
index 449e220..5a1425d 100644
--- a/src/java/org/apache/nutch/fetcher/FetcherThread.java
+++ b/src/java/org/apache/nutch/fetcher/FetcherThread.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.crawl.NutchWritable;
 import org.apache.nutch.crawl.SignatureFactory;
+import org.apache.nutch.fetcher.FetcherThreadEvent.PublishEventType;
 import org.apache.nutch.metadata.Metadata;
 import org.apache.nutch.metadata.Nutch;
 import org.apache.nutch.net.URLExemptionFilters;
@@ -135,6 +136,10 @@ public class FetcherThread extends Thread {
   //Used by the REST service
   private FetchNode fetchNode;
   private boolean reportToNutchServer;
+  
+  //Used for publishing events
+  private FetcherThreadPublisher publisher;
+  private boolean activatePublisher;
 
   public FetcherThread(Configuration conf, AtomicInteger activeThreads, 
FetchItemQueues fetchQueues, 
       QueueFeeder feeder, AtomicInteger spinWaiting, AtomicLong 
lastRequestStart, Reporter reporter,
@@ -164,6 +169,10 @@ public class FetcherThread extends Thread {
     this.storingContent = storingContent;
     this.pages = pages;
     this.bytes = bytes;
+
+    if((activatePublisher=conf.getBoolean("fetcher.publisher", false)))
+      this.publisher = new FetcherThreadPublisher(conf);
+    
     queueMode = conf.get("fetcher.queue.mode",
         FetchItemQueues.QUEUE_MODE_HOST);
     // check that the mode is known
@@ -254,6 +263,13 @@ public class FetcherThread extends Thread {
           // fetch the page
           redirecting = false;
           redirectCount = 0;
+          
+          //Publisher event
+          if(activatePublisher) {
+            FetcherThreadEvent startEvent = new 
FetcherThreadEvent(PublishEventType.START, fit.getUrl().toString());
+            publisher.publish(startEvent, conf);
+          }
+          
           do {
             if (LOG.isInfoEnabled()) {
               LOG.info("fetching " + fit.url + " (queue crawl delay="
@@ -322,7 +338,13 @@ public class FetcherThread extends Thread {
               fetchNode.setFetchTime(System.currentTimeMillis());
               fetchNode.setUrl(fit.url);
             }
-
+            
+            //Publish fetch finish event
+            if(activatePublisher) {
+              FetcherThreadEvent endEvent = new 
FetcherThreadEvent(PublishEventType.END, fit.getUrl().toString());
+              endEvent.addEventData("status", status.getName());
+              publisher.publish(endEvent, conf);
+            }
             reporter.incrCounter("FetcherStatus", status.getName(), 1);
 
             switch (status.getCode()) {
@@ -688,7 +710,18 @@ public class FetcherThread extends Thread {
             outlinkList.add(links[i]);
             outlinks.add(toUrl);
           }
-
+          
+          //Publish fetch report event 
+          if(activatePublisher) {
+            FetcherThreadEvent reportEvent = new 
FetcherThreadEvent(PublishEventType.REPORT, url.toString());
+            reportEvent.addOutlinksToEventData(outlinkList);
+            reportEvent.addEventData(Nutch.FETCH_EVENT_TITLE, 
parseData.getTitle());
+            reportEvent.addEventData(Nutch.FETCH_EVENT_CONTENTTYPE, 
parseData.getContentMeta().get("content-type"));
+            reportEvent.addEventData(Nutch.FETCH_EVENT_SCORE, 
datum.getScore());
+            reportEvent.addEventData(Nutch.FETCH_EVENT_FETCHTIME, 
datum.getFetchTime());
+            reportEvent.addEventData(Nutch.FETCH_EVENT_CONTENTLANG, 
parseData.getContentMeta().get("content-language"));
+            publisher.publish(reportEvent, conf);
+          }
           // Only process depth N outlinks
           if (maxOutlinkDepth > 0 && outlinkDepth < maxOutlinkDepth) {
             reporter.incrCounter("FetcherOutlinks", "outlinks_detected",

http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/src/java/org/apache/nutch/fetcher/FetcherThreadEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/nutch/fetcher/FetcherThreadEvent.java 
b/src/java/org/apache/nutch/fetcher/FetcherThreadEvent.java
new file mode 100644
index 0000000..c5028cd
--- /dev/null
+++ b/src/java/org/apache/nutch/fetcher/FetcherThreadEvent.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.fetcher;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.parse.Outlink;
+
+/**
+ * This class is used to capture the various events occurring 
+ * at fetch time. These events are sent to a queue implementing the publisher
+ *
+ */
+public class FetcherThreadEvent implements Serializable{
+
+  /** Type of event to specify start, end or reporting of a fetch item.  **/
+  public static enum PublishEventType {START, END, REPORT}
+  
+  private PublishEventType eventType;
+  private Map<String, Object> eventData;
+  private String url; 
+  private Long timestamp; 
+  
+  /**
+   * Constructor to create an event to be published
+   * @param eventType  Type of {@link #eventType event} being created 
+   * @param url        URL of the fetched page to which this event belongs to
+   */
+  public FetcherThreadEvent(PublishEventType eventType, String url) {
+    this.eventType = eventType;
+    this.url = url;
+    this.timestamp = System.currentTimeMillis();
+  }
+  
+  /**
+   * Get type of this event object
+   * @return {@link PublishEventType Event} type
+   */
+  public PublishEventType getEventType() {
+    return eventType;
+  }
+  
+  /**
+   * Set event type of this object
+   * @param eventType  Set {@link #eventType event} type
+   */
+  public void setEventType(PublishEventType eventType) {
+    this.eventType = eventType;
+  }
+  
+  /**
+   * Get event data
+   * @return
+   */
+  public Map<String, Object> getEventData() {
+    return eventData;
+  }
+  /** 
+   * Set metadata to this even
+   * @param eventData  A map containing important information relevant 
+   *                                   to this event (fetched page).
+   *                                   Ex - score, title, outlinks, 
content-type, etc
+   */
+  public void setEventData(Map<String, Object> eventData) {
+    this.eventData = eventData;
+  }
+  
+  /**
+   * Get URL of this event
+   * @return {@link #url URL} of this event
+   */
+  public String getUrl() {
+    return url;
+  }
+  
+  /**
+   * Set URL of this event (fetched page)
+   * @param url        URL of the fetched page
+   */
+  public void setUrl(String url) {
+    this.url = url;
+  }
+  /**
+   * Add new data to the eventData object. 
+   * @param key        A key to refer to the data being added to this event
+   * @param value      Data to be stored in the event referenced by the above 
key
+   */
+  public void addEventData(String key, Object value) {
+    if(eventData == null) {
+      eventData = new HashMap<String, Object>();
+    }
+    eventData.put(key, value);
+  }
+  
+  /**
+   * Given a collection of lists this method will add it 
+   * the oultink metadata 
+   * @param links      A collection of outlinks generating from the fetched 
page
+   *                           this event refers to
+   */
+  public void addOutlinksToEventData(Collection<Outlink> links) {
+    ArrayList<Map<String, String>> outlinkList = new ArrayList<>();
+    for(Outlink link: links) {
+      Map<String, String> outlink = new HashMap<>();
+      outlink.put("url", link.getToUrl());
+      outlink.put("anchor", link.getAnchor());
+      outlinkList.add(outlink);
+    }
+    this.addEventData("outlinks", outlinkList);
+  }
+  
+  /**
+   * Get timestamp of current event. 
+   * @return {@link #timestamp Timestamp}
+   */
+  public Long getTimestamp() {
+    return timestamp;
+  }
+  
+  /**
+   * Set timestamp for this event
+   * @param timestamp  Timestamp of the occurrence of this event
+   */
+  public void setTimestamp(Long timestamp) {
+    this.timestamp = timestamp;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/src/java/org/apache/nutch/fetcher/FetcherThreadPublisher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/nutch/fetcher/FetcherThreadPublisher.java 
b/src/java/org/apache/nutch/fetcher/FetcherThreadPublisher.java
new file mode 100644
index 0000000..2832017
--- /dev/null
+++ b/src/java/org/apache/nutch/fetcher/FetcherThreadPublisher.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.fetcher;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.publisher.NutchPublishers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class handles the publishing of the events to the queue 
implementation. 
+ *
+ */
+public class FetcherThreadPublisher {
+
+  private static NutchPublishers publisher;
+  private static final Logger LOG = 
LoggerFactory.getLogger(FetcherThreadPublisher.class);
+
+  /**
+   * Configure all registered publishers
+   * @param conf {@link org.apache.hadoop.conf.Configuration Configuration} to 
be used
+   */
+  public FetcherThreadPublisher(Configuration conf) {
+    LOG.info("Setting up publishers");
+    publisher = new NutchPublishers(conf);
+    if(!publisher.setConfig(conf))
+      publisher = null;
+  }
+
+  /**
+   * Publish event to all registered publishers
+   * @param event      {@link org.apache.nutch.fetcher.FetcherThreadEvent 
Event} to be published
+   * @param conf       {@link org.apache.hadoop.conf.Configuration 
Configuration} to be used
+   */
+  public void publish(FetcherThreadEvent event, Configuration conf) {
+    if(publisher!=null) {
+      publisher.publish(event, conf);
+    }
+    else {
+      LOG.warn("Could not instantiate publisher implementation, continuing 
without publishing");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/src/java/org/apache/nutch/metadata/Nutch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/nutch/metadata/Nutch.java 
b/src/java/org/apache/nutch/metadata/Nutch.java
index cbc3317..7ad0b5e 100644
--- a/src/java/org/apache/nutch/metadata/Nutch.java
+++ b/src/java/org/apache/nutch/metadata/Nutch.java
@@ -97,4 +97,15 @@ public interface Nutch {
        public static final String ARG_SEGMENTDIR = "segment_dir";
        /** Argument key to specify the location of individual segment for the 
REST endpoints **/
        public static final String ARG_SEGMENT = "segment";
+       
+       /** Title key in the Pub/Sub event metadata for the title of the parsed 
page*/
+       public static final String FETCH_EVENT_TITLE = "title";
+       /** Content-type key in the Pub/Sub event metadata for the content-type 
of the parsed page*/
+       public static final String FETCH_EVENT_CONTENTTYPE = "content-type";
+       /** Score key in the Pub/Sub event metadata for the score of the parsed 
page*/
+       public static final String FETCH_EVENT_SCORE = "score";
+       /** Fetch time key in the Pub/Sub event metadata for the fetch time of 
the parsed page*/
+       public static final String FETCH_EVENT_FETCHTIME = "fetchTime";
+       /** Content-lanueage key in the Pub/Sub event metadata for the 
content-language of the parsed page*/
+       public static final String FETCH_EVENT_CONTENTLANG = "content-language";
 }

http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/src/java/org/apache/nutch/publisher/NutchPublisher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/nutch/publisher/NutchPublisher.java 
b/src/java/org/apache/nutch/publisher/NutchPublisher.java
new file mode 100644
index 0000000..75281ad
--- /dev/null
+++ b/src/java/org/apache/nutch/publisher/NutchPublisher.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.publisher;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.plugin.Pluggable;
+
+/**
+ * All publisher subscriber model implementations should implement this 
interface. 
+ *
+ */
+public interface NutchPublisher extends Configurable, Pluggable {
+
+
+  public final static String X_POINT_ID = NutchPublisher.class.getName();
+
+  /**
+   * Use implementation specific configurations
+   * @param conf       {@link org.apache.hadoop.conf.Configuration 
Configuration} to be used
+   */
+  public boolean setConfig(Configuration conf);
+
+  /**
+   * This method publishes the event. Make sure that the event is a Java POJO 
to avoid 
+   * Jackson JSON conversion errors. Currently we use the FetcherThreadEvent
+   * @param event
+   */
+  public void publish(Object event, Configuration conf);
+
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/src/java/org/apache/nutch/publisher/NutchPublishers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/nutch/publisher/NutchPublishers.java 
b/src/java/org/apache/nutch/publisher/NutchPublishers.java
new file mode 100644
index 0000000..b79c217
--- /dev/null
+++ b/src/java/org/apache/nutch/publisher/NutchPublishers.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.publisher;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.nutch.plugin.PluginRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NutchPublishers extends Configured implements NutchPublisher{
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(NutchPublishers.class);
+  private NutchPublisher[] publishers;
+  private Configuration conf;
+
+  public NutchPublishers(Configuration conf) {
+       this.conf = conf;
+    this.publishers = (NutchPublisher[])PluginRepository.get(conf).
+        getOrderedPlugins(NutchPublisher.class, 
+            NutchPublisher.X_POINT_ID, "publisher.order");
+  }
+
+  @Override
+  public boolean setConfig(Configuration conf) {
+    boolean success = false;
+    try {
+      for(int i=0; i<this.publishers.length; i++) {
+        success |= this.publishers[i].setConfig(conf);
+        if(success)
+          LOG.info("Successfully loaded {} publisher", 
+              this.publishers[i].getClass().getName());
+      }
+    }catch(Exception e) {
+      LOG.warn("Error while loading publishers : {}", e.getMessage());
+    }
+    if(!success) {
+      LOG.warn("Could not load any publishers out of {} publishers",  
+          this.publishers.length);
+    }
+    return success;
+  }
+
+  @Override
+  public void publish(Object event, Configuration conf) {
+    for(int i=0; i<this.publishers.length; i++) {
+      try{
+        this.publishers[i].publish(event, conf);
+      }catch(Exception e){
+        LOG.warn("Could not post event to {}", 
+            this.publishers[i].getClass().getName());
+      }
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration arg0) {
+         
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/src/plugin/build.xml
----------------------------------------------------------------------
diff --git a/src/plugin/build.xml b/src/plugin/build.xml
index 20ef870..1cd2743 100755
--- a/src/plugin/build.xml
+++ b/src/plugin/build.xml
@@ -65,6 +65,7 @@
      <ant dir="parse-swf" target="deploy"/>
      <ant dir="parse-tika" target="deploy"/>
      <ant dir="parse-zip" target="deploy"/>
+     <ant dir="publish-rabbitmq" target="deploy"/>
      <ant dir="scoring-depth" target="deploy"/>
      <ant dir="scoring-opic" target="deploy"/>
      <ant dir="scoring-link" target="deploy"/>

http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/src/plugin/nutch-extensionpoints/plugin.xml
----------------------------------------------------------------------
diff --git a/src/plugin/nutch-extensionpoints/plugin.xml 
b/src/plugin/nutch-extensionpoints/plugin.xml
index 8cf7a23..b6a2e9d 100644
--- a/src/plugin/nutch-extensionpoints/plugin.xml
+++ b/src/plugin/nutch-extensionpoints/plugin.xml
@@ -64,4 +64,8 @@
       id="org.apache.nutch.segment.SegmentMergeFilter"
       name="Nutch Segment Merge Filter"/>
 
+<extension-point
+      id="org.apache.nutch.publisher.NutchPublisher"
+      name="Nutch Publisher"/>
+
 </plugin>

http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/src/plugin/publish-rabbitmq/build-ivy.xml
----------------------------------------------------------------------
diff --git a/src/plugin/publish-rabbitmq/build-ivy.xml 
b/src/plugin/publish-rabbitmq/build-ivy.xml
new file mode 100644
index 0000000..683e757
--- /dev/null
+++ b/src/plugin/publish-rabbitmq/build-ivy.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project name="publish-rabbitmq" default="deps-jar" 
xmlns:ivy="antlib:org.apache.ivy.ant">
+
+    <property name="ivy.install.version" value="2.1.0" />
+    <condition property="ivy.home" value="${env.IVY_HOME}">
+      <isset property="env.IVY_HOME" />
+    </condition>
+    <property name="ivy.home" value="${user.home}/.ant" />
+    <property name="ivy.checksums" value="" />
+    <property name="ivy.jar.dir" value="${ivy.home}/lib" />
+    <property name="ivy.jar.file" value="${ivy.jar.dir}/ivy.jar" />
+
+    <target name="download-ivy" unless="offline">
+
+        <mkdir dir="${ivy.jar.dir}"/>
+        <!-- download Ivy from web site so that it can be used even without 
any special installation -->
+        <get 
src="http://repo2.maven.org/maven2/org/apache/ivy/ivy/${ivy.install.version}/ivy-${ivy.install.version}.jar";
 
+             dest="${ivy.jar.file}" usetimestamp="true"/>
+    </target>
+
+    <target name="init-ivy" depends="download-ivy">
+      <!-- try to load ivy here from ivy home, in case the user has not 
already dropped
+              it into ant's lib dir (note that the latter copy will always 
take precedence).
+              We will not fail as long as local lib dir exists (it may be 
empty) and
+              ivy is in at least one of ant's lib dir or the local lib dir. -->
+        <path id="ivy.lib.path">
+            <fileset dir="${ivy.jar.dir}" includes="*.jar"/>
+
+        </path>
+        <taskdef resource="org/apache/ivy/ant/antlib.xml"
+                 uri="antlib:org.apache.ivy.ant" classpathref="ivy.lib.path"/>
+    </target>
+
+  <target name="deps-jar" depends="init-ivy">
+    <ivy:retrieve pattern="lib/[artifact]-[revision].[ext]"/>
+  </target>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/src/plugin/publish-rabbitmq/build.xml
----------------------------------------------------------------------
diff --git a/src/plugin/publish-rabbitmq/build.xml 
b/src/plugin/publish-rabbitmq/build.xml
new file mode 100644
index 0000000..9b48aa0
--- /dev/null
+++ b/src/plugin/publish-rabbitmq/build.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project name="publish-rabbitmq" default="jar-core">
+
+  <import file="../build-plugin.xml"/>
+
+  <!-- Deploy Unit test dependencies -->
+  <target name="deps-test">
+    <ant target="deploy" inheritall="false" dir="../nutch-extensionpoints"/>
+  </target>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/src/plugin/publish-rabbitmq/ivy.xml
----------------------------------------------------------------------
diff --git a/src/plugin/publish-rabbitmq/ivy.xml 
b/src/plugin/publish-rabbitmq/ivy.xml
new file mode 100644
index 0000000..589786f
--- /dev/null
+++ b/src/plugin/publish-rabbitmq/ivy.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" ?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<ivy-module version="1.0">
+  <info organisation="org.apache.nutch" module="${ant.project.name}">
+    <license name="Apache 2.0"/>
+    <ivyauthor name="Apache Nutch Team" url="http://nutch.apache.org"/>
+    <description>
+        Apache Nutch
+    </description>
+  </info>
+
+  <configurations>
+    <include file="../../..//ivy/ivy-configurations.xml"/>
+  </configurations>
+
+  <publications>
+    <!--get the artifact from our module name-->
+    <artifact conf="master"/>
+  </publications>
+
+  <dependencies>
+    <dependency org="com.rabbitmq" name="amqp-client" rev="3.6.5" 
conf="*->default" />
+  </dependencies>
+  
+</ivy-module>

http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/src/plugin/publish-rabbitmq/plugin.xml
----------------------------------------------------------------------
diff --git a/src/plugin/publish-rabbitmq/plugin.xml 
b/src/plugin/publish-rabbitmq/plugin.xml
new file mode 100644
index 0000000..56c2d8b
--- /dev/null
+++ b/src/plugin/publish-rabbitmq/plugin.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<plugin
+   id="publish-rabbitmq"
+   name="RabbitMQ implementation"
+   version="1.0.0"
+   provider-name="nutch.org">
+
+
+   <runtime>
+      <library name="publish-rabbitmq.jar">
+         <export name="*"/>
+      </library>
+   </runtime>
+
+   <requires>
+      <import plugin="nutch-extensionpoints"/>
+   </requires>
+   
+   <extension id="org.apache.nutch.publisher.rabbitmq"
+              name="RabbitMQPublisherImpl"
+              point="org.apache.nutch.publisher.NutchPublisher">
+
+      <implementation id="RabbitMQPublisherImpl"
+                      
class="org.apache.nutch.publisher.rabbitmq.RabbitMQPublisherImpl" />
+   </extension>
+
+</plugin>

http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/src/plugin/publish-rabbitmq/src/java/org/apache/nutch/publisher/rabbitmq/RabbitMQPublisherImpl.java
----------------------------------------------------------------------
diff --git 
a/src/plugin/publish-rabbitmq/src/java/org/apache/nutch/publisher/rabbitmq/RabbitMQPublisherImpl.java
 
b/src/plugin/publish-rabbitmq/src/java/org/apache/nutch/publisher/rabbitmq/RabbitMQPublisherImpl.java
new file mode 100644
index 0000000..3fafbaf
--- /dev/null
+++ 
b/src/plugin/publish-rabbitmq/src/java/org/apache/nutch/publisher/rabbitmq/RabbitMQPublisherImpl.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.publisher.rabbitmq;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.nutch.publisher.NutchPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+public class RabbitMQPublisherImpl implements NutchPublisher{
+
+  private static String EXCHANGE_SERVER;
+  private static String EXCHANGE_TYPE;
+  private static String HOST;
+  private static final Logger LOG = 
LoggerFactory.getLogger(RabbitMQPublisherImpl.class);
+  private static Channel channel;
+
+  @Override
+  public boolean setConfig(Configuration conf) {
+    try{
+    EXCHANGE_SERVER = conf.get("rabbitmq.exchange.server", "fetcher_log");
+    EXCHANGE_TYPE = conf.get("rabbitmq.exchange.type", "fanout");
+    HOST = conf.get("rabbitmq.host", "localhost");
+    ConnectionFactory factory = new ConnectionFactory(); 
+    factory.setHost(HOST);
+    
+      Connection connection = factory.newConnection();
+      channel = connection.createChannel();
+      channel.exchangeDeclare(EXCHANGE_SERVER, EXCHANGE_TYPE);
+      LOG.info("Configured RabbitMQ publisher");
+      return true;
+    }catch(Exception e) {
+      LOG.error("Could not initialize RabbitMQ publisher - {}", 
StringUtils.stringifyException(e));
+      return false;
+    }
+
+  }
+
+  @Override
+  public void publish(Object event, Configuration conf) {
+    String rountingKey = conf.get("rabbitmq.queue.routingkey", "");
+    try {
+      channel.basicPublish(EXCHANGE_SERVER, rountingKey, null, 
getJSONString(event).getBytes());
+    } catch (Exception e) {
+      LOG.error("Error occured while publishing - {}", 
StringUtils.stringifyException(e));
+    }
+  }
+
+  private String getJSONString(Object obj) {
+    ObjectMapper mapper = new ObjectMapper();
+    try {
+      return mapper.writeValueAsString(obj);
+    } catch (JsonProcessingException e) {
+      LOG.error("Error converting event object to JSON String - {}", 
StringUtils.stringifyException(e));
+    }
+    return null;
+  }
+
+
+  @Override
+  public void setConf(Configuration arg0) {
+    
+  }
+
+  @Override
+  public Configuration getConf() {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/e53b34b2/src/plugin/publish-rabbitmq/src/java/org/apache/nutch/publisher/rabbitmq/package-info.java
----------------------------------------------------------------------
diff --git 
a/src/plugin/publish-rabbitmq/src/java/org/apache/nutch/publisher/rabbitmq/package-info.java
 
b/src/plugin/publish-rabbitmq/src/java/org/apache/nutch/publisher/rabbitmq/package-info.java
new file mode 100644
index 0000000..9f49c7f
--- /dev/null
+++ 
b/src/plugin/publish-rabbitmq/src/java/org/apache/nutch/publisher/rabbitmq/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Publisher package to implement queues
+ */
+package org.apache.nutch.publisher.rabbitmq;
+

Reply via email to