Author: fenglu
Date: Tue Aug 13 15:17:05 2013
New Revision: 1513543
URL: http://svn.apache.org/r1513543
Log:
NUTCH-1294 IndexClean job with solr implementation.
Added:
nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleanerJob.java
nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleaningFilter.java
nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleaningFilters.java
nutch/branches/2.x/src/java/org/apache/nutch/indexer/solr/SolrClean.java
Modified:
nutch/branches/2.x/conf/log4j.properties
nutch/branches/2.x/src/bin/nutch
nutch/branches/2.x/src/plugin/nutch-extensionpoints/plugin.xml
Modified: nutch/branches/2.x/conf/log4j.properties
URL:
http://svn.apache.org/viewvc/nutch/branches/2.x/conf/log4j.properties?rev=1513543&r1=1513542&r2=1513543&view=diff
==============================================================================
--- nutch/branches/2.x/conf/log4j.properties (original)
+++ nutch/branches/2.x/conf/log4j.properties Tue Aug 13 15:17:05 2013
@@ -36,6 +36,8 @@ log4j.logger.org.apache.nutch.indexer.In
log4j.logger.org.apache.nutch.indexer.solr.SolrIndexerJob=INFO,cmdstdout
log4j.logger.org.apache.nutch.indexer.solr.SolrWriter=INFO,cmdstdout
log4j.logger.org.apache.nutch.indexer.DeleteDuplicates=INFO,cmdstdout
+log4j.logger.org.apache.nutch.indexer.solr.SolrClean=INFO,cmdstdout
+log4j.logger.org.apache.nutch.indexer.IndexCleanerJob=INFO,cmdstdout
log4j.logger.org.apache.nutch.crawl.WebTableReader=INFO,cmdstdout
log4j.logger.org.apache.nutch.host.HostDbReader=INFO,cmdstdout
log4j.logger.org.apache.nutch.parse.ParserChecker=INFO,cmdstdout
Modified: nutch/branches/2.x/src/bin/nutch
URL:
http://svn.apache.org/viewvc/nutch/branches/2.x/src/bin/nutch?rev=1513543&r1=1513542&r2=1513543&view=diff
==============================================================================
--- nutch/branches/2.x/src/bin/nutch (original)
+++ nutch/branches/2.x/src/bin/nutch Tue Aug 13 15:17:05 2013
@@ -60,6 +60,7 @@ if [ $# = 0 ]; then
echo " elasticindex run the elasticsearch indexer"
echo " solrindex run the solr indexer on parsed batches"
echo " solrdedup remove duplicates from solr"
+ echo " solrclean configurable extension to remove various documents
from solr"
echo " parsechecker check the parser for a given url"
echo " indexchecker check the indexing filters for a given url"
echo " plugin load a plugin and run one of its classes main()"
@@ -209,6 +210,8 @@ elif [ "$COMMAND" = "solrindex" ] ; then
CLASS=org.apache.nutch.indexer.solr.SolrIndexerJob
elif [ "$COMMAND" = "solrdedup" ] ; then
CLASS=org.apache.nutch.indexer.solr.SolrDeleteDuplicates
+elif [ "$COMMAND" = "solrclean" ] ; then
+CLASS=org.apache.nutch.indexer.solr.SolrClean
elif [ "$COMMAND" = "parsechecker" ] ; then
CLASS=org.apache.nutch.parse.ParserChecker
elif [ "$COMMAND" = "indexchecker" ] ; then
Added: nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleanerJob.java
URL:
http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleanerJob.java?rev=1513543&view=auto
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleanerJob.java
(added)
+++ nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleanerJob.java
Tue Aug 13 15:17:05 2013
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.mapreduce.GoraMapper;
+import org.apache.gora.mapreduce.StringComparator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.nutch.crawl.CrawlStatus;
+import org.apache.nutch.indexer.IndexingException;
+import org.apache.nutch.indexer.solr.SolrConstants;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.storage.StorageUtils;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.NutchTool;
+import org.apache.nutch.util.ToolUtil;
+
+public abstract class IndexCleanerJob extends NutchTool implements Tool {
+
+ public static final String ARG_COMMIT = "commit";
+ public static final Logger LOG =
LoggerFactory.getLogger(IndexCleanerJob.class);
+ private Configuration conf;
+
+ private static final Collection<WebPage.Field> FIELDS = new
HashSet<WebPage.Field>();
+
+ static {
+ FIELDS.add(WebPage.Field.STATUS);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public static class CleanMapper extends
+ GoraMapper<String, WebPage, String, WebPage> {
+
+ private IndexCleaningFilters filters;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ filters = new IndexCleaningFilters(conf);
+ }
+
+ @Override
+ public void map(String key, WebPage page, Context context)
+ throws IOException, InterruptedException {
+ try {
+ if(page.getStatus() == CrawlStatus.STATUS_GONE
|| filters.remove(key, page)) {
+ context.write(key, page);
+ }
+ } catch (IndexingException e) {
+ LOG.warn("Error indexing "+key+": "+e);
+ }
+ }
+ }
+
+ public Collection<WebPage.Field> getFields(Job job) {
+ Configuration conf = job.getConfiguration();
+ Collection<WebPage.Field> columns = new
HashSet<WebPage.Field>(FIELDS);
+ IndexCleaningFilters filters = new IndexCleaningFilters(conf);
+ columns.addAll(filters.getFields());
+ return columns;
+ }
+
+ public abstract Class<? extends Reducer<String, WebPage, NullWritable,
NullWritable>> getReducerClass();
+
+ @Override
+ public Map<String, Object> run(Map<String, Object> args) throws
Exception {
+ String solrUrl = (String) args.get(Nutch.ARG_SOLR);
+ getConf().set(SolrConstants.SERVER_URL, solrUrl);
+ getConf().setBoolean(ARG_COMMIT,(Boolean)args.get(ARG_COMMIT));
+ currentJob = new NutchJob(getConf(), "index-clean");
+ currentJob.getConfiguration().setClass(
+ "mapred.output.key.comparator.class",
StringComparator.class,
+ RawComparator.class);
+
+ Collection<WebPage.Field> fields = getFields(currentJob);
+ StorageUtils.initMapperJob(currentJob, fields, String.class,
+ WebPage.class, CleanMapper.class);
+ currentJob.setReducerClass(getReducerClass());
+ currentJob.setOutputFormatClass(NullOutputFormat.class);
+ currentJob.waitForCompletion(true);
+ ToolUtil.recordJobStatus(null, currentJob, results);
+ return results;
+ }
+
+}
Added:
nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleaningFilter.java
URL:
http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleaningFilter.java?rev=1513543&view=auto
==============================================================================
---
nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleaningFilter.java
(added)
+++
nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleaningFilter.java
Tue Aug 13 15:17:05 2013
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.indexer;
+
+// Hadoop imports
+import org.apache.hadoop.conf.Configurable;
+import org.apache.nutch.plugin.FieldPluggable;
+import org.apache.nutch.storage.WebPage;
+
+
+/** Extension point for indexing. Permits one to add metadata to the indexed
+ * fields. All plugins found which implement this extension point are run
+ * sequentially on the parse.
+ */
+public interface IndexCleaningFilter extends FieldPluggable, Configurable {
+ /** The name of the extension point. */
+ final static String X_POINT_ID = IndexCleaningFilter.class.getName();
+
+ /**
+ * @param url page url
+ * @param page
+ * @return true == remove false == keep
+ * @throws IndexingException
+ */
+ boolean remove(String url, WebPage page)
+ throws IndexingException;
+}
Added:
nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleaningFilters.java
URL:
http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleaningFilters.java?rev=1513543&view=auto
==============================================================================
---
nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleaningFilters.java
(added)
+++
nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleaningFilters.java
Tue Aug 13 15:17:05 2013
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.indexer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.plugin.Extension;
+import org.apache.nutch.plugin.ExtensionPoint;
+import org.apache.nutch.plugin.PluginRepository;
+import org.apache.nutch.plugin.PluginRuntimeException;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.ObjectCache;
+
+/** Creates and caches {@link IndexCleaningFilter} implementing plugins.*/
+public class IndexCleaningFilters {
+
+ public static final String IndexCleaningFilter_ORDER =
"IndexCleaningFilterhbase.order";
+
+ public final static Logger LOG =
LoggerFactory.getLogger(IndexCleaningFilters.class);
+
+ private IndexCleaningFilter[] indexcleaningFilters;
+
+ public IndexCleaningFilters(Configuration conf) {
+ /* Get IndexCleaningFilter.order property */
+ String order = conf.get(IndexCleaningFilter_ORDER);
+ ObjectCache objectCache = ObjectCache.get(conf);
+ this.indexcleaningFilters = (IndexCleaningFilter[]) objectCache
+ .getObject(IndexCleaningFilter.class.getName());
+ if (this.indexcleaningFilters == null) {
+ /*
+ * If ordered filters are required, prepare array of filters based on
+ * property
+ */
+ String[] orderedFilters = null;
+ if (order != null && !order.trim().equals("")) {
+ orderedFilters = order.split("\\s+");
+ }
+ try {
+ ExtensionPoint point = PluginRepository.get(conf).getExtensionPoint(
+ IndexCleaningFilter.X_POINT_ID);
+ if (point == null)
+ throw new RuntimeException(IndexCleaningFilter.X_POINT_ID + " not
found.");
+ Extension[] extensions = point.getExtensions();
+ HashMap<String, IndexCleaningFilter> filterMap =
+ new HashMap<String, IndexCleaningFilter>();
+ for (int i = 0; i < extensions.length; i++) {
+ Extension extension = extensions[i];
+ IndexCleaningFilter filter = (IndexCleaningFilter) extension
+ .getExtensionInstance();
+ LOG.info("Adding " + filter.getClass().getName());
+ if (!filterMap.containsKey(filter.getClass().getName())) {
+ filterMap.put(filter.getClass().getName(), filter);
+ }
+ }
+ /*
+ * If no ordered filters required, just get the filters in an
+ * indeterminate order
+ */
+ if (orderedFilters == null) {
+ objectCache.setObject(IndexCleaningFilter.class.getName(),
+ filterMap.values().toArray(
+ new IndexCleaningFilter[0]));
+ /* Otherwise run the filters in the required order */
+ } else {
+ ArrayList<IndexCleaningFilter> filters = new
ArrayList<IndexCleaningFilter>();
+ for (int i = 0; i < orderedFilters.length; i++) {
+ IndexCleaningFilter filter = filterMap.get(orderedFilters[i]);
+ if (filter != null) {
+ filters.add(filter);
+ }
+ }
+ objectCache.setObject(IndexCleaningFilter.class.getName(), filters
+ .toArray(new IndexCleaningFilter[filters.size()]));
+ }
+ } catch (PluginRuntimeException e) {
+ throw new RuntimeException(e);
+ }
+ this.indexcleaningFilters = (IndexCleaningFilter[]) objectCache
+ .getObject(IndexCleaningFilter.class.getName());
+ }
+ }
+ /** Run all defined filters. */
+ public boolean remove(String url, WebPage page)
+ throws IndexingException {
+ for (IndexCleaningFilter indexcleaningFilter : indexcleaningFilters) {
+ if(indexcleaningFilter.remove(url,page)){
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public Collection<WebPage.Field> getFields() {
+ Collection<WebPage.Field> columns = new HashSet<WebPage.Field>();
+ for (IndexCleaningFilter indexcleaningFilter : indexcleaningFilters) {
+ Collection<WebPage.Field> fields = indexcleaningFilter.getFields();
+ if (fields != null) {
+ columns.addAll(fields);
+ }
+ }
+ return columns;
+ }
+
+}
Added: nutch/branches/2.x/src/java/org/apache/nutch/indexer/solr/SolrClean.java
URL:
http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/solr/SolrClean.java?rev=1513543&view=auto
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/indexer/solr/SolrClean.java
(added)
+++ nutch/branches/2.x/src/java/org/apache/nutch/indexer/solr/SolrClean.java
Tue Aug 13 15:17:05 2013
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexer.solr;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.indexer.IndexCleanerJob;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.ToolUtil;
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+
+public class SolrClean extends IndexCleanerJob {
+
+ public static final int NUM_MAX_DELETE_REQUEST = 1000;
+ public static final Logger LOG =
LoggerFactory.getLogger(SolrClean.class);
+
+ public static class CleanReducer extends
+ Reducer<String, WebPage, NullWritable, NullWritable> {
+ private int numDeletes = 0;
+ private SolrServer solr;
+ private UpdateRequest updateRequest = new UpdateRequest();
+ private boolean commit;
+
+ @Override
+ public void setup(Context job) throws IOException {
+ Configuration conf = job.getConfiguration();
+ commit = conf.getBoolean(ARG_COMMIT, true);
+ try {
+ solr = new CommonsHttpSolrServer(
+
conf.get(SolrConstants.SERVER_URL));
+ } catch (MalformedURLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public void reduce(String key, Iterable<WebPage> values,
Context context)
+ throws IOException {
+ updateRequest.deleteById(key);
+ numDeletes++;
+ if (numDeletes >= NUM_MAX_DELETE_REQUEST) {
+ try {
+ updateRequest.process(solr);
+ context.getCounter("SolrClean",
"DELETED").increment(
+ numDeletes);
+
+ } catch (SolrServerException e) {
+ throw new IOException(e);
+ }
+ updateRequest = new UpdateRequest();
+ numDeletes = 0;
+ }
+ }
+
+ @Override
+ public void cleanup(Context context) throws IOException {
+ try {
+ if (numDeletes > 0) {
+ updateRequest.process(solr);
+ context.getCounter("SolrClean",
"DELETED").increment(
+ numDeletes);
+ if (commit) {
+ solr.commit();
+ }
+ }
+ } catch (SolrServerException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ public Class<? extends Reducer<String, WebPage, NullWritable,
NullWritable>> getReducerClass(){
+ return CleanReducer.class;
+ }
+
+ public int delete(String solrUrl, boolean commit) throws Exception {
+ LOG.info("CleanJob: starting");
+ run(ToolUtil.toArgMap(Nutch.ARG_SOLR, solrUrl,
IndexCleanerJob.ARG_COMMIT, commit));
+ LOG.info("CleanJob: done");
+ return 0;
+ }
+
+ public int run(String[] args) throws Exception {
+ if (args.length < 1) {
+ System.err.println("Usage: SolrClean <solrurl>
[-noCommit]");
+ return 1;
+ }
+
+ boolean commit = true;
+ if (args.length == 2 && args[1].equals("-noCommit")) {
+ commit = false;
+ }
+
+ return delete(args[0], commit);
+ }
+
+ public static void main(String[] args) throws Exception {
+ int result = ToolRunner.run(NutchConfiguration.create(),
+ new SolrClean(), args);
+ System.exit(result);
+ }
+
+}
Modified: nutch/branches/2.x/src/plugin/nutch-extensionpoints/plugin.xml
URL:
http://svn.apache.org/viewvc/nutch/branches/2.x/src/plugin/nutch-extensionpoints/plugin.xml?rev=1513543&r1=1513542&r2=1513543&view=diff
==============================================================================
--- nutch/branches/2.x/src/plugin/nutch-extensionpoints/plugin.xml (original)
+++ nutch/branches/2.x/src/plugin/nutch-extensionpoints/plugin.xml Tue Aug 13
15:17:05 2013
@@ -29,6 +29,10 @@
name="Nutch Indexing Filter"/>
<extension-point
+ id="org.apache.nutch.indexer.IndexCleaningFilter"
+ name="Nutch Index Cleaning Filter"/>
+
+<extension-point
id="org.apache.nutch.parse.Parser"
name="Nutch Content Parser"/>