Author: fenglu
Date: Tue Aug 13 15:17:05 2013
New Revision: 1513543

URL: http://svn.apache.org/r1513543
Log:
NUTCH-1294 IndexClean job with solr implementation.

Added:
    nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleanerJob.java
    
nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleaningFilter.java
    
nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleaningFilters.java
    nutch/branches/2.x/src/java/org/apache/nutch/indexer/solr/SolrClean.java
Modified:
    nutch/branches/2.x/conf/log4j.properties
    nutch/branches/2.x/src/bin/nutch
    nutch/branches/2.x/src/plugin/nutch-extensionpoints/plugin.xml

Modified: nutch/branches/2.x/conf/log4j.properties
URL: 
http://svn.apache.org/viewvc/nutch/branches/2.x/conf/log4j.properties?rev=1513543&r1=1513542&r2=1513543&view=diff
==============================================================================
--- nutch/branches/2.x/conf/log4j.properties (original)
+++ nutch/branches/2.x/conf/log4j.properties Tue Aug 13 15:17:05 2013
@@ -36,6 +36,8 @@ log4j.logger.org.apache.nutch.indexer.In
 log4j.logger.org.apache.nutch.indexer.solr.SolrIndexerJob=INFO,cmdstdout
 log4j.logger.org.apache.nutch.indexer.solr.SolrWriter=INFO,cmdstdout
 log4j.logger.org.apache.nutch.indexer.DeleteDuplicates=INFO,cmdstdout
+log4j.logger.org.apache.nutch.indexer.solr.SolrClean=INFO,cmdstdout
+log4j.logger.org.apache.nutch.indexer.IndexCleanerJob=INFO,cmdstdout
 log4j.logger.org.apache.nutch.crawl.WebTableReader=INFO,cmdstdout
 log4j.logger.org.apache.nutch.host.HostDbReader=INFO,cmdstdout
 log4j.logger.org.apache.nutch.parse.ParserChecker=INFO,cmdstdout

Modified: nutch/branches/2.x/src/bin/nutch
URL: 
http://svn.apache.org/viewvc/nutch/branches/2.x/src/bin/nutch?rev=1513543&r1=1513542&r2=1513543&view=diff
==============================================================================
--- nutch/branches/2.x/src/bin/nutch (original)
+++ nutch/branches/2.x/src/bin/nutch Tue Aug 13 15:17:05 2013
@@ -60,6 +60,7 @@ if [ $# = 0 ]; then
   echo " elasticindex   run the elasticsearch indexer"
   echo " solrindex     run the solr indexer on parsed batches"
   echo " solrdedup     remove duplicates from solr"
+  echo " solrclean      configurable extension to remove various documents 
from solr"
   echo " parsechecker   check the parser for a given url"
   echo " indexchecker   check the indexing filters for a given url"
   echo " plugin        load a plugin and run one of its classes main()"
@@ -209,6 +210,8 @@ elif [ "$COMMAND" = "solrindex" ] ; then
 CLASS=org.apache.nutch.indexer.solr.SolrIndexerJob
 elif [ "$COMMAND" = "solrdedup" ] ; then
 CLASS=org.apache.nutch.indexer.solr.SolrDeleteDuplicates
+elif [ "$COMMAND" = "solrclean" ] ; then
+CLASS=org.apache.nutch.indexer.solr.SolrClean
 elif [ "$COMMAND" = "parsechecker" ] ; then
   CLASS=org.apache.nutch.parse.ParserChecker
 elif [ "$COMMAND" = "indexchecker" ] ; then

Added: nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleanerJob.java
URL: 
http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleanerJob.java?rev=1513543&view=auto
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleanerJob.java 
(added)
+++ nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleanerJob.java 
Tue Aug 13 15:17:05 2013
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.mapreduce.GoraMapper;
+import org.apache.gora.mapreduce.StringComparator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.nutch.crawl.CrawlStatus;
+import org.apache.nutch.indexer.IndexingException;
+import org.apache.nutch.indexer.solr.SolrConstants;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.storage.StorageUtils;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.NutchTool;
+import org.apache.nutch.util.ToolUtil;
+
+public abstract class IndexCleanerJob extends NutchTool implements Tool {
+
+       public static final String ARG_COMMIT = "commit";
+       public static final Logger LOG = 
LoggerFactory.getLogger(IndexCleanerJob.class);
+       private Configuration conf;
+
+       private static final Collection<WebPage.Field> FIELDS = new 
HashSet<WebPage.Field>();
+
+       static {
+               FIELDS.add(WebPage.Field.STATUS);
+       }
+
+       @Override
+       public Configuration getConf() {
+               return conf;
+       }
+
+       @Override
+       public void setConf(Configuration conf) {
+               this.conf = conf;
+       }
+
+       public static class CleanMapper extends
+                       GoraMapper<String, WebPage, String, WebPage> {
+               
+                 private IndexCleaningFilters filters;
+               
+               @Override
+                 protected void setup(Context context) throws IOException {
+                   Configuration conf = context.getConfiguration();
+                   filters = new IndexCleaningFilters(conf);
+                 }
+
+               @Override
+               public void map(String key, WebPage page, Context context)
+                               throws IOException, InterruptedException {
+                       try {                           
+                                if(page.getStatus() == CrawlStatus.STATUS_GONE 
|| filters.remove(key, page)) {
+                                       context.write(key, page);
+                               }
+                       } catch (IndexingException e) {
+                               LOG.warn("Error indexing "+key+": "+e);
+                       }
+               }
+       }
+
+       public Collection<WebPage.Field> getFields(Job job) {
+               Configuration conf = job.getConfiguration();
+           Collection<WebPage.Field> columns = new 
HashSet<WebPage.Field>(FIELDS);             
+           IndexCleaningFilters filters = new IndexCleaningFilters(conf);
+           columns.addAll(filters.getFields());
+           return columns;
+       }       
+       
+       public abstract Class<? extends Reducer<String, WebPage, NullWritable, 
NullWritable>> getReducerClass();
+       
+       @Override
+       public Map<String, Object> run(Map<String, Object> args) throws 
Exception {
+               String solrUrl = (String) args.get(Nutch.ARG_SOLR);
+               getConf().set(SolrConstants.SERVER_URL, solrUrl);
+               getConf().setBoolean(ARG_COMMIT,(Boolean)args.get(ARG_COMMIT));
+               currentJob = new NutchJob(getConf(), "index-clean");
+               currentJob.getConfiguration().setClass(
+                               "mapred.output.key.comparator.class", 
StringComparator.class,
+                               RawComparator.class);
+
+               Collection<WebPage.Field> fields = getFields(currentJob);
+               StorageUtils.initMapperJob(currentJob, fields, String.class,
+                               WebPage.class, CleanMapper.class);
+               currentJob.setReducerClass(getReducerClass());
+               currentJob.setOutputFormatClass(NullOutputFormat.class);
+               currentJob.waitForCompletion(true);
+               ToolUtil.recordJobStatus(null, currentJob, results);
+               return results;
+       }       
+
+}

Added: 
nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleaningFilter.java
URL: 
http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleaningFilter.java?rev=1513543&view=auto
==============================================================================
--- 
nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleaningFilter.java 
(added)
+++ 
nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleaningFilter.java 
Tue Aug 13 15:17:05 2013
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.indexer;
+
+// Hadoop imports
+import org.apache.hadoop.conf.Configurable;
+import org.apache.nutch.plugin.FieldPluggable;
+import org.apache.nutch.storage.WebPage;
+
+
+/** Extension point for indexing.  Permits one to add metadata to the indexed
+ * fields.  All plugins found which implement this extension point are run
+ * sequentially on the parse.
+ */
+public interface IndexCleaningFilter extends FieldPluggable, Configurable {
+  /** The name of the extension point. */
+  final static String X_POINT_ID = IndexCleaningFilter.class.getName();
+
+  /**   
+   * @param url page url
+   * @param page
+   * @return true == remove false == keep
+   * @throws IndexingException
+   */
+  boolean remove(String url, WebPage page)
+  throws IndexingException;
+}

Added: 
nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleaningFilters.java
URL: 
http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleaningFilters.java?rev=1513543&view=auto
==============================================================================
--- 
nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleaningFilters.java 
(added)
+++ 
nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleaningFilters.java 
Tue Aug 13 15:17:05 2013
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.indexer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.plugin.Extension;
+import org.apache.nutch.plugin.ExtensionPoint;
+import org.apache.nutch.plugin.PluginRepository;
+import org.apache.nutch.plugin.PluginRuntimeException;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.ObjectCache;
+
+/** Creates and caches {@link IndexCleaningFilter} implementing plugins.*/
+public class IndexCleaningFilters {
+
+  public static final String IndexCleaningFilter_ORDER = 
"IndexCleaningFilterhbase.order";
+
+  public final static Logger LOG = 
LoggerFactory.getLogger(IndexCleaningFilters.class);
+
+  private IndexCleaningFilter[] indexcleaningFilters;
+
+  public IndexCleaningFilters(Configuration conf) {
+    /* Get IndexCleaningFilter.order property */
+    String order = conf.get(IndexCleaningFilter_ORDER);
+    ObjectCache objectCache = ObjectCache.get(conf);
+    this.indexcleaningFilters = (IndexCleaningFilter[]) objectCache
+        .getObject(IndexCleaningFilter.class.getName());
+    if (this.indexcleaningFilters == null) {
+      /*
+       * If ordered filters are required, prepare array of filters based on
+       * property
+       */
+      String[] orderedFilters = null;
+      if (order != null && !order.trim().equals("")) {
+        orderedFilters = order.split("\\s+");
+      }
+      try {
+        ExtensionPoint point = PluginRepository.get(conf).getExtensionPoint(
+            IndexCleaningFilter.X_POINT_ID);
+        if (point == null)
+          throw new RuntimeException(IndexCleaningFilter.X_POINT_ID + " not 
found.");
+        Extension[] extensions = point.getExtensions();
+        HashMap<String, IndexCleaningFilter> filterMap =
+          new HashMap<String, IndexCleaningFilter>();
+        for (int i = 0; i < extensions.length; i++) {
+          Extension extension = extensions[i];
+          IndexCleaningFilter filter = (IndexCleaningFilter) extension
+              .getExtensionInstance();
+          LOG.info("Adding " + filter.getClass().getName());
+          if (!filterMap.containsKey(filter.getClass().getName())) {
+            filterMap.put(filter.getClass().getName(), filter);
+          }
+        }
+        /*
+         * If no ordered filters required, just get the filters in an
+         * indeterminate order
+         */
+        if (orderedFilters == null) {
+          objectCache.setObject(IndexCleaningFilter.class.getName(),
+              filterMap.values().toArray(
+                  new IndexCleaningFilter[0]));
+          /* Otherwise run the filters in the required order */
+        } else {
+          ArrayList<IndexCleaningFilter> filters = new 
ArrayList<IndexCleaningFilter>();
+          for (int i = 0; i < orderedFilters.length; i++) {
+                 IndexCleaningFilter filter = filterMap.get(orderedFilters[i]);
+            if (filter != null) {
+              filters.add(filter);
+            }
+          }
+          objectCache.setObject(IndexCleaningFilter.class.getName(), filters
+              .toArray(new IndexCleaningFilter[filters.size()]));
+        }
+      } catch (PluginRuntimeException e) {
+        throw new RuntimeException(e);
+      }
+      this.indexcleaningFilters = (IndexCleaningFilter[]) objectCache
+          .getObject(IndexCleaningFilter.class.getName());
+    }
+  }
+  /** Run all defined filters. */
+  public boolean remove(String url, WebPage page)
+  throws IndexingException {
+    for (IndexCleaningFilter indexcleaningFilter : indexcleaningFilters) {
+       if(indexcleaningFilter.remove(url,page)){
+               return true;
+       }
+    }
+    return false;
+  }
+
+  public Collection<WebPage.Field> getFields() {
+    Collection<WebPage.Field> columns = new HashSet<WebPage.Field>();
+    for (IndexCleaningFilter indexcleaningFilter : indexcleaningFilters) {
+      Collection<WebPage.Field> fields = indexcleaningFilter.getFields();
+      if (fields != null) {
+        columns.addAll(fields);
+      }
+    }
+    return columns;
+  }
+
+}

Added: nutch/branches/2.x/src/java/org/apache/nutch/indexer/solr/SolrClean.java
URL: 
http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/solr/SolrClean.java?rev=1513543&view=auto
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/indexer/solr/SolrClean.java 
(added)
+++ nutch/branches/2.x/src/java/org/apache/nutch/indexer/solr/SolrClean.java 
Tue Aug 13 15:17:05 2013
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexer.solr;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.indexer.IndexCleanerJob;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.ToolUtil;
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+
+public class SolrClean extends IndexCleanerJob {
+
+       public static final int NUM_MAX_DELETE_REQUEST = 1000;
+       public static final Logger LOG = 
LoggerFactory.getLogger(SolrClean.class);      
+
+       public static class CleanReducer extends
+                       Reducer<String, WebPage, NullWritable, NullWritable> {
+               private int numDeletes = 0;
+               private SolrServer solr;
+               private UpdateRequest updateRequest = new UpdateRequest();
+               private boolean commit;
+
+               @Override
+               public void setup(Context job) throws IOException {
+                       Configuration conf = job.getConfiguration();
+                       commit = conf.getBoolean(ARG_COMMIT, true);
+                       try {
+                               solr = new CommonsHttpSolrServer(
+                                               
conf.get(SolrConstants.SERVER_URL));
+                       } catch (MalformedURLException e) {
+                               throw new IOException(e);
+                       }
+               }
+
+               public void reduce(String key, Iterable<WebPage> values, 
Context context)
+                               throws IOException {
+               updateRequest.deleteById(key);
+                       numDeletes++;
+                       if (numDeletes >= NUM_MAX_DELETE_REQUEST) {
+                               try {
+                                       updateRequest.process(solr);
+                                       context.getCounter("SolrClean", 
"DELETED").increment(
+                                                       numDeletes);
+
+                               } catch (SolrServerException e) {
+                                       throw new IOException(e);
+                               }
+                               updateRequest = new UpdateRequest();
+                               numDeletes = 0;
+                       }
+               }
+
+               @Override
+               public void cleanup(Context context) throws IOException {
+                       try {
+                               if (numDeletes > 0) {
+                                       updateRequest.process(solr);
+                                       context.getCounter("SolrClean", 
"DELETED").increment(
+                                                       numDeletes);
+                                       if (commit) {
+                                               solr.commit();
+                                       }
+                               }
+                       } catch (SolrServerException e) {
+                               throw new IOException(e);
+                       }
+               }
+       }
+       
+       public Class<? extends Reducer<String, WebPage, NullWritable, 
NullWritable>> getReducerClass(){
+               return CleanReducer.class;
+       }       
+
+       public int delete(String solrUrl, boolean commit) throws Exception {
+               LOG.info("CleanJob: starting");
+               run(ToolUtil.toArgMap(Nutch.ARG_SOLR, solrUrl, 
IndexCleanerJob.ARG_COMMIT, commit));
+               LOG.info("CleanJob: done");
+               return 0;
+       }
+
+       public int run(String[] args) throws Exception {
+               if (args.length < 1) {
+                       System.err.println("Usage: SolrClean <solrurl> 
[-noCommit]");
+                       return 1;
+               }
+
+               boolean commit = true;
+               if (args.length == 2 && args[1].equals("-noCommit")) {
+                       commit = false;
+               }
+
+               return delete(args[0], commit);
+       }
+
+       public static void main(String[] args) throws Exception {
+               int result = ToolRunner.run(NutchConfiguration.create(),
+                               new SolrClean(), args);
+               System.exit(result);
+       }
+
+}

Modified: nutch/branches/2.x/src/plugin/nutch-extensionpoints/plugin.xml
URL: 
http://svn.apache.org/viewvc/nutch/branches/2.x/src/plugin/nutch-extensionpoints/plugin.xml?rev=1513543&r1=1513542&r2=1513543&view=diff
==============================================================================
--- nutch/branches/2.x/src/plugin/nutch-extensionpoints/plugin.xml (original)
+++ nutch/branches/2.x/src/plugin/nutch-extensionpoints/plugin.xml Tue Aug 13 
15:17:05 2013
@@ -29,6 +29,10 @@
       name="Nutch Indexing Filter"/>
 
 <extension-point
+      id="org.apache.nutch.indexer.IndexCleaningFilter"
+      name="Nutch Index Cleaning Filter"/>
+
+<extension-point
       id="org.apache.nutch.parse.Parser"
       name="Nutch Content Parser"/>
  


Reply via email to