Author: ab
Date: Thu Oct 21 12:01:36 2010
New Revision: 1025963

URL: http://svn.apache.org/viewvc?rev=1025963&view=rev
Log:
NUTCH-907 DataStore API doesn't support multiple storage areas for multiple 
disjoint crawls.

Modified:
    nutch/trunk/CHANGES.txt
    nutch/trunk/conf/nutch-default.xml
    nutch/trunk/src/java/org/apache/nutch/crawl/DbUpdaterJob.java
    nutch/trunk/src/java/org/apache/nutch/crawl/GeneratorJob.java
    nutch/trunk/src/java/org/apache/nutch/crawl/GeneratorReducer.java
    nutch/trunk/src/java/org/apache/nutch/crawl/InjectorJob.java
    nutch/trunk/src/java/org/apache/nutch/crawl/WebTableReader.java
    nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherJob.java
    nutch/trunk/src/java/org/apache/nutch/indexer/IndexerJob.java
    nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrIndexerJob.java
    nutch/trunk/src/java/org/apache/nutch/metadata/Nutch.java
    nutch/trunk/src/java/org/apache/nutch/parse/ParserJob.java
    nutch/trunk/src/java/org/apache/nutch/storage/StorageUtils.java
    nutch/trunk/src/java/org/apache/nutch/tools/Benchmark.java
    nutch/trunk/src/java/org/apache/nutch/util/NutchJob.java
    nutch/trunk/src/test/org/apache/nutch/crawl/TestGenerator.java
    nutch/trunk/src/test/org/apache/nutch/crawl/TestInjector.java
    nutch/trunk/src/test/org/apache/nutch/fetcher/TestFetcher.java
    nutch/trunk/src/test/org/apache/nutch/util/AbstractNutchTest.java

Modified: nutch/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1025963&r1=1025962&r2=1025963&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Thu Oct 21 12:01:36 2010
@@ -74,6 +74,8 @@ Release 2.0 - Current Development
 
 * NUTCH-921 Reduce dependency of Nutch on config files (ab)
 
+* NUTCH-907 DataStore API doesn't support multiple storage areas for multiple 
disjoint crawls (Sertan Alkan via ab)
+
 
 Release 1.1 - 2010-06-06
 

Modified: nutch/trunk/conf/nutch-default.xml
URL: 
http://svn.apache.org/viewvc/nutch/trunk/conf/nutch-default.xml?rev=1025963&r1=1025962&r2=1025963&view=diff
==============================================================================
--- nutch/trunk/conf/nutch-default.xml (original)
+++ nutch/trunk/conf/nutch-default.xml Thu Oct 21 12:01:36 2010
@@ -1041,4 +1041,25 @@
   <description>Default class for storing data</description>
 </property>
 
+<property>
+  <name>storage.schema</name>
+  <value>webpage</value>
+  <description>This value holds the schema name used for Nutch web db.
+  Note that Nutch ignores the value in the gora mapping files, and uses
+  this as the schema name.
+  </description>
+</property>
+
+<property>
+  <name>storage.crawl.id</name>
+  <value></value>
+  <description>This value helps differentiate between the datasets that
+  the jobs in the crawl cycle generate and operate on. The value will
+  be input to all the jobs which then will use it as a prefix when
+  accessing to the schemas. The default configuration uses no id to prefix
+  the schemas. The value could also be given as a command line argument
+  to each job.
+  </description>
+</property>
+
 </configuration>

Modified: nutch/trunk/src/java/org/apache/nutch/crawl/DbUpdaterJob.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/DbUpdaterJob.java?rev=1025963&r1=1025962&r2=1025963&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/crawl/DbUpdaterJob.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/crawl/DbUpdaterJob.java Thu Oct 21 
12:01:36 2010
@@ -11,6 +11,7 @@ import org.apache.hadoop.io.RawComparato
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.metadata.Nutch;
 import org.apache.nutch.scoring.ScoringFilters;
 import org.apache.nutch.storage.StorageUtils;
 import org.apache.nutch.storage.WebPage;
@@ -42,11 +43,11 @@ implements Tool {
     FIELDS.add(WebPage.Field.PREV_FETCH_TIME);
     FIELDS.add(WebPage.Field.PREV_SIGNATURE);
   }
-  
+
   public DbUpdaterJob() {
-    
+
   }
-  
+
   public DbUpdaterJob(Configuration conf) {
     setConf(conf);
   }
@@ -75,7 +76,10 @@ implements Tool {
   }
 
   public int run(String[] args) throws Exception {
-       return updateTable();
+    if (args.length == 2 && "-crawlId".equals(args[0])) {
+      getConf().set(Nutch.CRAWL_ID_KEY, args[1]);
+    }
+    return updateTable();
   }
 
   public static void main(String[] args) throws Exception {

Modified: nutch/trunk/src/java/org/apache/nutch/crawl/GeneratorJob.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/GeneratorJob.java?rev=1025963&r1=1025962&r2=1025963&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/crawl/GeneratorJob.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/crawl/GeneratorJob.java Thu Oct 21 
12:01:36 2010
@@ -38,7 +38,7 @@ public class GeneratorJob extends Config
   public static final String GENERATOR_CUR_TIME = "generate.curTime";
   public static final String GENERATOR_DELAY = "crawl.gen.delay";
   public static final String GENERATOR_RANDOM_SEED = "generate.partition.seed";
-  public static final String CRAWL_ID = "generate.crawl.id";
+  public static final String BATCH_ID = "generate.batch.id";
 
   private static final Set<WebPage.Field> FIELDS = new 
HashSet<WebPage.Field>();
 
@@ -114,13 +114,13 @@ public class GeneratorJob extends Config
   }
 
   public GeneratorJob() {
-    
+
   }
-  
+
   public GeneratorJob(Configuration conf) {
     setConf(conf);
   }
-  
+
   /**
    * Mark URLs ready for fetching.
    * @throws ClassNotFoundException
@@ -141,9 +141,9 @@ public class GeneratorJob extends Config
     getConf().setLong(GENERATOR_TOP_N, topN);
     getConf().setBoolean(GENERATOR_FILTER, filter);
     int randomSeed = Math.abs(new Random().nextInt());
-    String crawlId = (curTime / 1000) + "-" + randomSeed;
+    String batchId = (curTime / 1000) + "-" + randomSeed;
     getConf().setInt(GENERATOR_RANDOM_SEED, randomSeed);
-    getConf().set(CRAWL_ID, crawlId);
+    getConf().set(BATCH_ID, batchId);
     getConf().setLong(Nutch.GENERATE_TIME_KEY, System.currentTimeMillis());
     getConf().setBoolean(GENERATOR_NORMALISE, norm);
     String mode = getConf().get(GENERATOR_COUNT_MODE, 
GENERATOR_COUNT_VALUE_HOST);
@@ -157,17 +157,17 @@ public class GeneratorJob extends Config
       getConf().set(URLPartitioner.PARTITION_MODE_KEY, 
URLPartitioner.PARTITION_MODE_HOST);
     }
 
-    Job job = new NutchJob(getConf(), "generate: " + crawlId);
-    StorageUtils.initMapperJob(job, FIELDS, SelectorEntry.class, WebPage.class,
-        GeneratorMapper.class, URLPartitioner.class);
+    Job job = new NutchJob(getConf(), "generate: " + batchId);
+    StorageUtils.initMapperJob(job, FIELDS, SelectorEntry.class,
+        WebPage.class, GeneratorMapper.class, URLPartitioner.class, true);
     StorageUtils.initReducerJob(job, GeneratorReducer.class);
 
     boolean success = job.waitForCompletion(true);
     if (!success) return null;
 
     LOG.info("GeneratorJob: done");
-    LOG.info("GeneratorJob: generated crawl id: " + crawlId);
-    return crawlId;
+    LOG.info("GeneratorJob: generated batch id: " + batchId);
+    return batchId;
   }
 
   public int run(String[] args) throws Exception {
@@ -181,6 +181,8 @@ public class GeneratorJob extends Config
         filter = false;
       } else if ("-noNorm".equals(args[i])) {
         norm = false;
+      } else if ("-crawlId".equals(args[i])) {
+        getConf().set(Nutch.CRAWL_ID_KEY, args[++i]);
       }
     }
 

Modified: nutch/trunk/src/java/org/apache/nutch/crawl/GeneratorReducer.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/GeneratorReducer.java?rev=1025963&r1=1025962&r2=1025963&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/crawl/GeneratorReducer.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/crawl/GeneratorReducer.java Thu Oct 
21 12:01:36 2010
@@ -28,7 +28,7 @@ extends GoraReducer<SelectorEntry, WebPa
   private long count = 0;
   private boolean byDomain = false;
   private Map<String, Integer> hostCountMap = new HashMap<String, Integer>();
-  private Utf8 crawlId;
+  private Utf8 batchId;
 
   @Override
   protected void reduce(SelectorEntry key, Iterable<WebPage> values,
@@ -56,7 +56,7 @@ extends GoraReducer<SelectorEntry, WebPa
         return;
       }
 
-      Mark.GENERATE_MARK.putMark(page, crawlId);
+      Mark.GENERATE_MARK.putMark(page, batchId);
       context.write(TableUtil.reverseUrl(key.url), page);
       context.getCounter("Generator", "GENERATE_MARK").increment(1);
       count++;
@@ -74,7 +74,7 @@ extends GoraReducer<SelectorEntry, WebPa
       limit = totalLimit / context.getNumReduceTasks();
     }
     maxCount = conf.getLong(GeneratorJob.GENERATOR_MAX_COUNT, -2);
-    crawlId = new Utf8(conf.get(GeneratorJob.CRAWL_ID));
+    batchId = new Utf8(conf.get(GeneratorJob.BATCH_ID));
     String countMode =
       conf.get(GeneratorJob.GENERATOR_COUNT_MODE, 
GeneratorJob.GENERATOR_COUNT_VALUE_HOST);
     if (countMode.equals(GeneratorJob.GENERATOR_COUNT_VALUE_DOMAIN)) {

Modified: nutch/trunk/src/java/org/apache/nutch/crawl/InjectorJob.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/InjectorJob.java?rev=1025963&r1=1025962&r2=1025963&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/crawl/InjectorJob.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/crawl/InjectorJob.java Thu Oct 21 
12:01:36 2010
@@ -22,6 +22,7 @@ import org.apache.hadoop.mapreduce.lib.i
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.metadata.Nutch;
 import org.apache.nutch.net.URLFilters;
 import org.apache.nutch.net.URLNormalizers;
 import org.apache.nutch.scoring.ScoringFilterException;
@@ -34,6 +35,7 @@ import org.apache.nutch.util.NutchJob;
 import org.apache.nutch.util.TableUtil;
 import org.gora.mapreduce.GoraMapper;
 import org.gora.mapreduce.GoraOutputFormat;
+import org.gora.store.DataStore;
 
 /** This class takes a flat file of URLs and adds them to the of pages to be
  * crawled.  Useful for bootstrapping the system.
@@ -172,13 +174,13 @@ public class InjectorJob extends GoraMap
   }
 
   public InjectorJob() {
-    
+
   }
-  
+
   public InjectorJob(Configuration conf) {
     setConf(conf);
   }
-  
+
   @Override
   public Configuration getConf() {
     return conf;
@@ -223,15 +225,16 @@ public class InjectorJob extends GoraMap
     job.setMapOutputKeyClass(String.class);
     job.setMapOutputValueClass(WebPage.class);
     job.setOutputFormatClass(GoraOutputFormat.class);
-    GoraOutputFormat.setOutput(job, String.class,
-        WebPage.class, StorageUtils.getDataStoreClass(getConf()), true);
+    DataStore<String, WebPage> store = 
StorageUtils.createWebStore(job.getConfiguration(),
+        String.class, WebPage.class);
+    GoraOutputFormat.setOutput(job, store, true);
     job.setReducerClass(Reducer.class);
     job.setNumReduceTasks(0);
     job.waitForCompletion(true);
 
     job = new NutchJob(getConf(), "inject-p2 " + urlDir);
-    StorageUtils.initMapperJob(job, FIELDS, String.class, WebPage.class,
-        InjectorJob.class);
+    StorageUtils.initMapperJob(job, FIELDS, String.class,
+        WebPage.class, InjectorJob.class);
     job.setNumReduceTasks(0);
     job.waitForCompletion(true);
   }
@@ -239,9 +242,13 @@ public class InjectorJob extends GoraMap
   @Override
   public int run(String[] args) throws Exception {
     if (args.length < 1) {
-      System.err.println("Usage: InjectorJob <url_dir>");
+      System.err.println("Usage: InjectorJob <url_dir> [-crawlId <id>]");
       return -1;
     }
+    if (args.length == 3 && "-crawlId".equals(args[1])) {
+      getConf().set(Nutch.CRAWL_ID_KEY, args[2]);
+    }
+
     try {
       inject(new Path(args[0]));
       LOG.info("InjectorJob: finished");

Modified: nutch/trunk/src/java/org/apache/nutch/crawl/WebTableReader.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/WebTableReader.java?rev=1025963&r1=1025962&r2=1025963&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/crawl/WebTableReader.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/crawl/WebTableReader.java Thu Oct 21 
12:01:36 2010
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.lib.o
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.metadata.Nutch;
 import org.apache.nutch.parse.ParseStatusUtils;
 import org.apache.nutch.protocol.ProtocolStatusUtils;
 import org.apache.nutch.storage.ParseStatus;
@@ -59,6 +60,7 @@ public class WebTableReader extends Conf
     public WebTableStatMapper() {
     }
 
+    @Override
     public void setup(Context context) {
       sort = context.getConfiguration().getBoolean("db.reader.stats.sort",
           false);
@@ -92,9 +94,11 @@ public class WebTableReader extends Conf
       Reducer<Text, LongWritable, Text, LongWritable> {
     LongWritable val = new LongWritable();
 
+    @Override
     public void setup(Context context) {
     }
 
+    @Override
     public void cleanup(Context context) {
     }
 
@@ -136,6 +140,7 @@ public class WebTableReader extends Conf
   public static class WebTableStatReducer extends
       Reducer<Text, LongWritable, Text, LongWritable> {
 
+    @Override
     public void cleanup(Context context) {
     }
 
@@ -204,7 +209,7 @@ public class WebTableReader extends Conf
 
     job.getConfiguration().setBoolean("db.reader.stats.sort", sort);
 
-    DataStore<String, WebPage> store = StorageUtils.createDataStore(job
+    DataStore<String, WebPage> store = StorageUtils.createWebStore(job
         .getConfiguration(), String.class, WebPage.class);
     Query<String, WebPage> query = store.newQuery();
     query.setFields(WebPage._ALL_FIELDS);
@@ -303,7 +308,7 @@ public class WebTableReader extends Conf
   /** Prints out the entry to the standard out **/
   private void read(String key, boolean dumpContent, boolean dumpHeaders,
       boolean dumpLinks, boolean dumpText) throws ClassNotFoundException, 
IOException {
-    DataStore<String, WebPage> datastore = 
StorageUtils.createDataStore(getConf(),
+    DataStore<String, WebPage> datastore = 
StorageUtils.createWebStore(getConf(),
         String.class, WebPage.class);
 
     Query<String, WebPage> query = datastore.newQuery();
@@ -391,8 +396,8 @@ public class WebTableReader extends Conf
     cfg.setBoolean(WebTableRegexMapper.headersParamName, headers);
     cfg.setBoolean(WebTableRegexMapper.linksParamName, links);
     cfg.setBoolean(WebTableRegexMapper.textParamName, text);
-    
-    DataStore<String, WebPage> store = StorageUtils.createDataStore(job
+
+    DataStore<String, WebPage> store = StorageUtils.createWebStore(job
         .getConfiguration(), String.class, WebPage.class);
     Query<String, WebPage> query = store.newQuery();
     query.setFields(WebPage._ALL_FIELDS);
@@ -425,9 +430,9 @@ public class WebTableReader extends Conf
     sb.append("prevFetchTime:\t" + page.getPrevFetchTime()).append("\n");
     sb.append("retries:\t" + page.getRetriesSinceFetch()).append("\n");
     sb.append("modifiedTime:\t" + page.getModifiedTime()).append("\n");
-    sb.append("protocolStatus:\t" + 
+    sb.append("protocolStatus:\t" +
         ProtocolStatusUtils.toString(page.getProtocolStatus())).append("\n");
-    sb.append("parseStatus:\t" + 
+    sb.append("parseStatus:\t" +
         ParseStatusUtils.toString(page.getParseStatus())).append("\n");
     sb.append("title:\t" + page.getTitle()).append("\n");
     sb.append("score:\t" + page.getScore()).append("\n");
@@ -467,7 +472,7 @@ public class WebTableReader extends Conf
       if (headers != null) {
         for (Entry<Utf8,Utf8> e : headers.entrySet()) {
           sb.append("header:\t" + e.getKey() + "\t" + e.getValue() + "\n");
-        }        
+        }
       }
     }
     ByteBuffer content = page.getContent();
@@ -481,9 +486,9 @@ public class WebTableReader extends Conf
     if (text != null && dumpText) {
       sb.append("text:start:\n");
       sb.append(text.toString());
-      sb.append("\ntext:end:\n");      
+      sb.append("\ntext:end:\n");
     }
-    
+
     return sb.toString();
   }
 
@@ -492,13 +497,14 @@ public class WebTableReader extends Conf
         args);
     System.exit(res);
   }
-  
+
   private static enum Op {READ, STAT, DUMP};
 
   public int run(String[] args) throws Exception {
     if (args.length < 1) {
       System.err
-          .println("Usage: WebTableReader (-stats | -url [url] | -dump 
<out_dir> [-regex regex]) [-content] [-headers] [-links] [-text]");
+          .println("Usage: WebTableReader (-stats | -url [url] | -dump 
<out_dir> [-regex regex]) [-crawlId <id>] [-content] [-headers] [-links] 
[-text]");
+      System.err.println("\t-crawlId <id>\t the id to prefix the schemas to 
operate on, (default: storage.crawl.id)");
       System.err
           .println("\t-stats [-sort] \tprint overall statistics to 
System.out");
       System.err.println("\t\t[-sort]\tlist status sorted by host");
@@ -530,7 +536,7 @@ public class WebTableReader extends Conf
           //read(param);
           //return 0;
         } else if (args[i].equals("-stats")) {
-          op = op.STAT;
+          op = Op.STAT;
         } else if (args[i].equals("-sort")) {
           toSort = true;
         } else if (args[i].equals("-dump")) {
@@ -546,6 +552,8 @@ public class WebTableReader extends Conf
           text = true;
         } else if (args[i].equals("-regex")) {
           regex = args[++i];
+        } else if (args[i].equals("-crawlId")) {
+          getConf().set(Nutch.CRAWL_ID_KEY, args[++i]);
         }
       }
       if (op == null) {

Modified: nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherJob.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherJob.java?rev=1025963&r1=1025962&r2=1025963&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherJob.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherJob.java Thu Oct 21 
12:01:36 2010
@@ -38,7 +38,7 @@ public class FetcherJob implements Tool 
   public static final int PERM_REFRESH_TIME = 5;
 
   public static final Utf8 REDIRECT_DISCOVERED = new Utf8("___rdrdsc__");
-  
+
   public static final String RESUME_KEY = "fetcher.job.resume";
   public static final String PARSE_KEY = "fetcher.parse";
   public static final String THREADS_KEY = "fetcher.threads.fetch";
@@ -72,7 +72,7 @@ public class FetcherJob implements Tool 
 
     private boolean shouldContinue;
 
-    private Utf8 crawlId;
+    private Utf8 batchId;
 
     private Random random = new Random();
 
@@ -80,16 +80,16 @@ public class FetcherJob implements Tool 
     protected void setup(Context context) {
       Configuration conf = context.getConfiguration();
       shouldContinue = conf.getBoolean(RESUME_KEY, false);
-      crawlId = new Utf8(conf.get(GeneratorJob.CRAWL_ID, 
Nutch.ALL_CRAWL_ID_STR));
+      batchId = new Utf8(conf.get(GeneratorJob.BATCH_ID, 
Nutch.ALL_BATCH_ID_STR));
     }
 
     @Override
     protected void map(String key, WebPage page, Context context)
         throws IOException, InterruptedException {
       Utf8 mark = Mark.GENERATE_MARK.checkMark(page);
-      if (!NutchJob.shouldProcess(mark, crawlId)) {
+      if (!NutchJob.shouldProcess(mark, batchId)) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; different 
crawl id");
+          LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; different 
batch id");
         }
         return;
       }
@@ -107,11 +107,11 @@ public class FetcherJob implements Tool 
   public static final Logger LOG = LoggerFactory.getLogger(FetcherJob.class);
 
   private Configuration conf;
-  
+
   public FetcherJob() {
-    
+
   }
-  
+
   public FetcherJob(Configuration conf) {
     setConf(conf);
   }
@@ -140,7 +140,7 @@ public class FetcherJob implements Tool 
 
   /**
    * Run fetcher.
-   * @param crawlId crawlId (obtained from Generator) or null to fetch all 
generated fetchlists
+   * @param batchId batchId (obtained from Generator) or null to fetch all 
generated fetchlists
    * @param threads number of threads per map task
    * @param shouldResume
    * @param parse if true, then parse content immediately, if false then a 
separate
@@ -150,7 +150,8 @@ public class FetcherJob implements Tool 
    * @return 0 on success
    * @throws Exception
    */
-  public int fetch(String crawlId, int threads, boolean shouldResume, boolean 
parse, int numTasks)
+  public int fetch(String batchId, int threads,
+      boolean shouldResume, boolean parse, int numTasks)
       throws Exception {
     LOG.info("FetcherJob: starting");
 
@@ -159,10 +160,10 @@ public class FetcherJob implements Tool 
     if (threads > 0) {
       getConf().setInt(THREADS_KEY, threads);
     }
-    getConf().set(GeneratorJob.CRAWL_ID, crawlId);
+    getConf().set(GeneratorJob.BATCH_ID, batchId);
     getConf().setBoolean(PARSE_KEY, parse);
     getConf().setBoolean(RESUME_KEY, shouldResume);
-    
+
     // set the actual time for the timelimit relative
     // to the beginning of the whole job and not of a specific task
     // otherwise it keeps trying again if a task fails
@@ -176,10 +177,10 @@ public class FetcherJob implements Tool 
     LOG.info("FetcherJob: threads: " + getConf().getInt(THREADS_KEY, 10));
     LOG.info("FetcherJob: parsing: " + getConf().getBoolean(PARSE_KEY, true));
     LOG.info("FetcherJob: resuming: " + getConf().getBoolean(RESUME_KEY, 
false));
-    if (crawlId.equals(Nutch.ALL_CRAWL_ID_STR)) {
+    if (batchId.equals(Nutch.ALL_BATCH_ID_STR)) {
       LOG.info("FetcherJob: fetching all");
     } else {
-      LOG.info("FetcherJob: crawlId: " + crawlId);
+      LOG.info("FetcherJob: batchId: " + batchId);
     }
 
     Job job = new NutchJob(getConf(), "fetch");
@@ -242,10 +243,12 @@ public class FetcherJob implements Tool 
     int threads = -1;
     boolean shouldResume = false;
     boolean parse = getConf().getBoolean(PARSE_KEY, false);
-    String crawlId;
+    String batchId;
 
-    String usage = "Usage: FetcherJob (<crawl id> | -all) [-threads N] 
[-parse] [-resume] [-numTasks N]\n" +
-      "\tcrawlId\tcrawl identifier returned by Generator, or -all for all 
generated crawlId-s\n" +
+    String usage = "Usage: FetcherJob (<batchId> | -all) [-crawlId <id>] " +
+      "[-threads N] [-parse] [-resume] [-numTasks N]\n" +
+      "\tbatchId\tcrawl identifier returned by Generator, or -all for all 
generated batchId-s\n" +
+      "\t-crawlId <id>\t the id to prefix the schemas to operate on, (default: 
storage.crawl.id)\n" +
       "\t-threads N\tnumber of fetching threads per task\n" +
       "\t-parse\tif specified then fetcher will immediately parse fetched 
content\n" +
       "\t-resume\tresume interrupted job\n" +
@@ -256,8 +259,8 @@ public class FetcherJob implements Tool 
       return -1;
     }
 
-    crawlId = args[0];
-    if (!crawlId.equals("-all") && crawlId.startsWith("-")) {
+    batchId = args[0];
+    if (!batchId.equals("-all") && batchId.startsWith("-")) {
       System.err.println(usage);
       return -1;
     }
@@ -271,11 +274,13 @@ public class FetcherJob implements Tool 
       } else if ("-parse".equals(args[i])) {
         parse = true;
       } else if ("-numTasks".equals(args[i])) {
-               numTasks = Integer.parseInt(args[++i]);
+        numTasks = Integer.parseInt(args[++i]);
+      } else if ("-crawlId".equals(args[i])) {
+        getConf().set(Nutch.CRAWL_ID_KEY, args[++i]);
       }
     }
 
-    int fetchcode = fetch(crawlId, threads, shouldResume, parse, numTasks); // 
run the Fetcher
+    int fetchcode = fetch(batchId, threads, shouldResume, parse, numTasks); // 
run the Fetcher
 
     return fetchcode;
   }

Modified: nutch/trunk/src/java/org/apache/nutch/indexer/IndexerJob.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/IndexerJob.java?rev=1025963&r1=1025962&r2=1025963&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/IndexerJob.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/IndexerJob.java Thu Oct 21 
12:01:36 2010
@@ -37,7 +37,7 @@ implements Tool {
 
   private Configuration conf;
 
-  protected Utf8 crawlId;
+  protected Utf8 batchId;
 
   static {
     FIELDS.add(WebPage.Field.SIGNATURE);
@@ -59,7 +59,7 @@ implements Tool {
   @Override
   public void setup(Context context) throws IOException {
     Configuration conf = context.getConfiguration();
-    crawlId = new Utf8(conf.get(GeneratorJob.CRAWL_ID, 
Nutch.ALL_CRAWL_ID_STR));
+    batchId = new Utf8(conf.get(GeneratorJob.BATCH_ID, 
Nutch.ALL_BATCH_ID_STR));
   }
 
   @Override
@@ -72,10 +72,10 @@ implements Tool {
     }
 
     Utf8 mark = Mark.UPDATEDB_MARK.checkMark(page);
-    if (!crawlId.equals(REINDEX)) {
-      if (!NutchJob.shouldProcess(mark, crawlId)) {
+    if (!batchId.equals(REINDEX)) {
+      if (!NutchJob.shouldProcess(mark, batchId)) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; different 
crawl id");
+          LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; different 
batch id");
         }
         return;
       }
@@ -94,9 +94,9 @@ implements Tool {
     return columns;
   }
 
-  protected Job createIndexJob(Configuration conf, String jobName, String 
crawlId)
+  protected Job createIndexJob(Configuration conf, String jobName, String 
batchId)
   throws IOException, ClassNotFoundException {
-    conf.set(GeneratorJob.CRAWL_ID, crawlId);
+    conf.set(GeneratorJob.BATCH_ID, batchId);
     Job job = new NutchJob(conf, jobName);
     // TODO: Figure out why this needs to be here
     job.getConfiguration().setClass("mapred.output.key.comparator.class",

Modified: nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrIndexerJob.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrIndexerJob.java?rev=1025963&r1=1025962&r2=1025963&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrIndexerJob.java 
(original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrIndexerJob.java Thu 
Oct 21 12:01:36 2010
@@ -28,6 +28,7 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.nutch.indexer.IndexerJob;
 import org.apache.nutch.indexer.NutchIndexWriterFactory;
+import org.apache.nutch.metadata.Nutch;
 import org.apache.nutch.util.NutchConfiguration;
 import org.apache.solr.client.solrj.SolrServer;
 import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
@@ -36,13 +37,13 @@ public class SolrIndexerJob extends Inde
 
   public static Logger LOG = LoggerFactory.getLogger(SolrIndexerJob.class);
 
-  private void indexSolr(String solrUrl, String crawlId) throws Exception {
+  private void indexSolr(String solrUrl, String batchId) throws Exception {
     LOG.info("SolrIndexerJob: starting");
 
     NutchIndexWriterFactory.addClassToConf(getConf(), SolrWriter.class);
     getConf().set(SolrConstants.SERVER_URL, solrUrl);
 
-    Job job = createIndexJob(getConf(), "solr-index", crawlId);
+    Job job = createIndexJob(getConf(), "solr-index", batchId);
     Path tmp = new Path("tmp_" + System.currentTimeMillis() + "-"
                 + new Random().nextInt());
 
@@ -61,10 +62,13 @@ public class SolrIndexerJob extends Inde
 
   public int run(String[] args) throws Exception {
     if (args.length < 2) {
-      System.err.println("Usage: SolrIndexerJob <solr url> (<crawl id> | -all 
| -reindex)");
+      System.err.println("Usage: SolrIndexerJob <solr url> (<batch id> | -all 
| -reindex) [-crawlId <id>]");
       return -1;
     }
 
+    if (args.length == 4 && "-crawlId".equals(args[2])) {
+      getConf().set(Nutch.CRAWL_ID_KEY, args[3]);
+    }
     try {
       indexSolr(args[0], args[1]);
       return 0;

Modified: nutch/trunk/src/java/org/apache/nutch/metadata/Nutch.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/metadata/Nutch.java?rev=1025963&r1=1025962&r2=1025963&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/metadata/Nutch.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/metadata/Nutch.java Thu Oct 21 
12:01:36 2010
@@ -70,7 +70,9 @@ public interface Nutch {
 
   public static final Text WRITABLE_REPR_URL_KEY = new Text(REPR_URL_KEY);
 
-  public static final String ALL_CRAWL_ID_STR = "-all";
+  public static final String ALL_BATCH_ID_STR = "-all";
 
-  public static final Utf8 ALL_CRAWL_ID = new Utf8(ALL_CRAWL_ID_STR);
+  public static final Utf8 ALL_CRAWL_ID = new Utf8(ALL_BATCH_ID_STR);
+
+  public static final String CRAWL_ID_KEY = "storage.crawl.id";
 }

Modified: nutch/trunk/src/java/org/apache/nutch/parse/ParserJob.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/parse/ParserJob.java?rev=1025963&r1=1025962&r2=1025963&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/parse/ParserJob.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/parse/ParserJob.java Thu Oct 21 
12:01:36 2010
@@ -29,7 +29,7 @@ public class ParserJob extends GoraMappe
     implements Tool {
 
   public static final Logger LOG = LoggerFactory.getLogger(ParserJob.class);
-  
+
   private static final String RESUME_KEY = "parse.job.resume";
   private static final String FORCE_KEY = "parse.job.force";
 
@@ -51,15 +51,15 @@ public class ParserJob extends GoraMappe
   private ParseUtil parseUtil;
 
   private boolean shouldResume;
-  
+
   private boolean force;
 
-  private Utf8 crawlId;
-  
+  private Utf8 batchId;
+
   public ParserJob() {
-    
+
   }
-  
+
   public ParserJob(Configuration conf) {
     setConf(conf);
   }
@@ -69,17 +69,17 @@ public class ParserJob extends GoraMappe
     Configuration conf = context.getConfiguration();
     parseUtil = new ParseUtil(conf);
     shouldResume = conf.getBoolean(RESUME_KEY, false);
-    force = conf.getBoolean(FORCE_KEY, false);    
-    crawlId = new Utf8(conf.get(GeneratorJob.CRAWL_ID, 
Nutch.ALL_CRAWL_ID_STR));
+    force = conf.getBoolean(FORCE_KEY, false);
+    batchId = new Utf8(conf.get(GeneratorJob.BATCH_ID, 
Nutch.ALL_BATCH_ID_STR));
   }
 
   @Override
   public void map(String key, WebPage page, Context context)
       throws IOException, InterruptedException {
     Utf8 mark = Mark.FETCH_MARK.checkMark(page);
-    if (!NutchJob.shouldProcess(mark, crawlId)) {
+    if (!NutchJob.shouldProcess(mark, batchId)) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; different 
crawl id");
+        LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; different 
batch id");
       }
       return;
     }
@@ -87,7 +87,7 @@ public class ParserJob extends GoraMappe
       if (force) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Forced parsing " + TableUtil.unreverseUrl(key) + "; 
already parsed");
-        }        
+        }
       } else {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; already 
parsed");
@@ -144,21 +144,21 @@ public class ParserJob extends GoraMappe
     this.conf = conf;
   }
 
-  public int parse(String crawlId, boolean shouldResume, boolean force) throws 
Exception {
+  public int parse(String batchId, boolean shouldResume, boolean force) throws 
Exception {
     LOG.info("ParserJob: starting");
 
-    if (crawlId != null) {
-      getConf().set(GeneratorJob.CRAWL_ID, crawlId);
+    if (batchId != null) {
+      getConf().set(GeneratorJob.BATCH_ID, batchId);
     }
     getConf().setBoolean(RESUME_KEY, shouldResume);
     getConf().setBoolean(FORCE_KEY, force);
 
     LOG.info("ParserJob: resuming:\t" + getConf().getBoolean(RESUME_KEY, 
false));
     LOG.info("ParserJob: forced reparse:\t" + getConf().getBoolean(FORCE_KEY, 
false));
-    if (crawlId == null || crawlId.equals(Nutch.ALL_CRAWL_ID_STR)) {
+    if (batchId == null || batchId.equals(Nutch.ALL_BATCH_ID_STR)) {
       LOG.info("ParserJob: parsing all");
     } else {
-      LOG.info("ParserJob: crawlId:\t" + crawlId);
+      LOG.info("ParserJob: batchId:\t" + batchId);
     }
 
     final Job job = new NutchJob(getConf(), "parse");
@@ -180,36 +180,39 @@ public class ParserJob extends GoraMappe
   public int run(String[] args) throws Exception {
     boolean shouldResume = false;
     boolean force = false;
-    String crawlId = null;
+    String batchId = null;
 
     if (args.length < 1) {
-      System.err.println("Usage: ParserJob (<crawlId> | -all) [-resume] 
[-force]");
-      System.err.println("\tcrawlId\tsymbolic crawl ID created by Generator");
+      System.err.println("Usage: ParserJob (<batchId> | -all) [-crawlId <id>] 
[-resume] [-force]");
+      System.err.println("\tbatchId\tsymbolic batch ID created by Generator");
+      System.err.println("\t-crawlId <id>\t the id to prefix the schemas to 
operate on, (default: storage.crawl.id)");
       System.err.println("\t-all\tconsider pages from all crawl jobs");
       System.err.println("-resume\tresume a previous incomplete job");
       System.err.println("-force\tforce re-parsing even if a page is already 
parsed");
       return -1;
     }
-    for (String s : args) {
-      if ("-resume".equals(s)) {
+    for (int i = 0; i < args.length; i++) {
+      if ("-resume".equals(args[i])) {
         shouldResume = true;
-      } else if ("-force".equals(s)) {
+      } else if ("-force".equals(args[i])) {
         force = true;
-      } else if ("-all".equals(s)) {
-        crawlId = s;
+      } else if ("-crawlId".equals(args[i])) {
+        getConf().set(Nutch.CRAWL_ID_KEY, args[++i]);
+      } else if ("-all".equals(args[i])) {
+        batchId = args[i];
       } else {
-        if (crawlId != null) {
-          System.err.println("CrawlId already set to '" + crawlId + "'!");
+        if (batchId != null) {
+          System.err.println("BatchId already set to '" + batchId + "'!");
           return -1;
         }
-        crawlId = s;
+        batchId = args[i];
       }
     }
-    if (crawlId == null) {
-      System.err.println("CrawlId not set (or -all not specified)!");
+    if (batchId == null) {
+      System.err.println("BatchId not set (or -all not specified)!");
       return -1;
     }
-    return parse(crawlId, shouldResume, force);
+    return parse(batchId, shouldResume, force);
   }
 
   public static void main(String[] args) throws Exception {

Modified: nutch/trunk/src/java/org/apache/nutch/storage/StorageUtils.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/storage/StorageUtils.java?rev=1025963&r1=1025962&r2=1025963&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/storage/StorageUtils.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/storage/StorageUtils.java Thu Oct 21 
12:01:36 2010
@@ -7,6 +7,7 @@ import java.util.Iterator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.nutch.metadata.Nutch;
 import org.gora.mapreduce.GoraMapper;
 import org.gora.mapreduce.GoraOutputFormat;
 import org.gora.mapreduce.GoraReducer;
@@ -27,6 +28,22 @@ public class StorageUtils {
   }
 
   @SuppressWarnings("unchecked")
+  public static <K, V extends Persistent> DataStore<K, V> 
createWebStore(Configuration conf,
+      Class<K> keyClass, Class<V> persistentClass) throws 
ClassNotFoundException {
+    String schema = conf.get("storage.schema", "webpage");
+    String crawlId = conf.get(Nutch.CRAWL_ID_KEY, "");
+
+    if (!crawlId.isEmpty()) {
+      schema = crawlId + "_" + schema;
+    }
+
+    Class<? extends DataStore<K, V>> dataStoreClass =
+      (Class<? extends DataStore<K, V>>) getDataStoreClass(conf);
+    return DataStoreFactory.createDataStore(dataStoreClass,
+            keyClass, persistentClass, schema);
+  }
+
+  @SuppressWarnings("unchecked")
   public static <K, V extends Persistent> Class<? extends DataStore<K, V>>
   getDataStoreClass(Configuration conf)  throws ClassNotFoundException {
     return (Class<? extends DataStore<K, V>>)
@@ -39,7 +56,8 @@ public class StorageUtils {
       Class<K> outKeyClass, Class<V> outValueClass,
       Class<? extends GoraMapper<String, WebPage, K, V>> mapperClass, boolean 
reuseObjects)
   throws ClassNotFoundException, IOException {
-    initMapperJob(job, fields, outKeyClass, outValueClass, mapperClass, null, 
reuseObjects);
+    initMapperJob(job, fields, outKeyClass, outValueClass,
+        mapperClass, null, reuseObjects);
   }
 
   public static <K, V> void initMapperJob(Job job,
@@ -47,7 +65,8 @@ public class StorageUtils {
       Class<K> outKeyClass, Class<V> outValueClass,
       Class<? extends GoraMapper<String, WebPage, K, V>> mapperClass)
   throws ClassNotFoundException, IOException {
-    initMapperJob(job, fields, outKeyClass, outValueClass, mapperClass, null, 
true);
+    initMapperJob(job, fields, outKeyClass, outValueClass,
+        mapperClass, null, true);
   }
 
   public static <K, V> void initMapperJob(Job job,
@@ -56,7 +75,8 @@ public class StorageUtils {
       Class<? extends GoraMapper<String, WebPage, K, V>> mapperClass,
       Class<? extends Partitioner<K, V>> partitionerClass)
   throws ClassNotFoundException, IOException {
-    initMapperJob(job, fields, outKeyClass, outValueClass, mapperClass, 
partitionerClass, true);
+    initMapperJob(job, fields, outKeyClass, outValueClass,
+        mapperClass, partitionerClass, true);
   }
 
   public static <K, V> void initMapperJob(Job job,
@@ -65,8 +85,8 @@ public class StorageUtils {
       Class<? extends GoraMapper<String, WebPage, K, V>> mapperClass,
       Class<? extends Partitioner<K, V>> partitionerClass, boolean 
reuseObjects)
   throws ClassNotFoundException, IOException {
-    DataStore<String, WebPage> store =
-      createDataStore(job.getConfiguration(), String.class, WebPage.class);
+    DataStore<String, WebPage> store = createWebStore(job.getConfiguration(),
+        String.class, WebPage.class);
     if (store==null) throw new RuntimeException("Could not create datastore");
     Query<String, WebPage> query = store.newQuery();
     query.setFields(toStringArray(fields));
@@ -80,7 +100,7 @@ public class StorageUtils {
   throws ClassNotFoundException {
     Configuration conf = job.getConfiguration();
     DataStore<String, WebPage> store =
-      StorageUtils.createDataStore(conf, String.class, WebPage.class);
+      StorageUtils.createWebStore(conf, String.class, WebPage.class);
     GoraReducer.initReducerJob(job, store, reducerClass);
     GoraOutputFormat.setOutput(job, store, true);
   }

Modified: nutch/trunk/src/java/org/apache/nutch/tools/Benchmark.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/Benchmark.java?rev=1025963&r1=1025962&r2=1025963&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/tools/Benchmark.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/tools/Benchmark.java Thu Oct 21 
12:01:36 2010
@@ -20,6 +20,7 @@ import org.apache.nutch.crawl.GeneratorJ
 import org.apache.nutch.crawl.InjectorJob;
 import org.apache.nutch.crawl.WebTableReader;
 import org.apache.nutch.fetcher.FetcherJob;
+import org.apache.nutch.metadata.Nutch;
 import org.apache.nutch.parse.ParserJob;
 import org.apache.nutch.util.NutchConfiguration;
 import org.apache.nutch.util.NutchJob;
@@ -32,7 +33,7 @@ public class Benchmark extends Configure
     int res = ToolRunner.run(conf, new Benchmark(), args);
     System.exit(res);
   }
-  
+
   private void createSeeds(FileSystem fs, Path seedsDir, int count) throws 
Exception {
     OutputStream os = fs.create(new Path(seedsDir, "seeds"));
     for (int i = 0; i < count; i++) {
@@ -42,7 +43,7 @@ public class Benchmark extends Configure
     os.flush();
     os.close();
   }
-  
+
   public static final class BenchmarkResults {
     Map<String,Map<String,Long>> timings = new 
HashMap<String,Map<String,Long>>();
     List<String> runs = new ArrayList<String>();
@@ -51,7 +52,7 @@ public class Benchmark extends Configure
     long topN;
     long elapsed;
     String plugins;
-    
+
     public void addTiming(String stage, String run, long timing) {
       if (!runs.contains(run)) {
         runs.add(run);
@@ -66,7 +67,8 @@ public class Benchmark extends Configure
       }
       t.put(run, timing);
     }
-    
+
+    @Override
     public String toString() {
       StringBuilder sb = new StringBuilder();
       sb.append("* Plugins:\t" + plugins + "\n");
@@ -89,7 +91,7 @@ public class Benchmark extends Configure
       }
       return sb.toString();
     }
-    
+
     public List<String> getStages() {
       return stages;
     }
@@ -97,7 +99,7 @@ public class Benchmark extends Configure
       return runs;
     }
   }
-  
+
   public int run(String[] args) throws Exception {
     String plugins = 
"protocol-http|parse-tika|scoring-opic|urlfilter-regex|urlnormalizer-pass";
     int seeds = 1;
@@ -105,9 +107,10 @@ public class Benchmark extends Configure
     int threads = 10;
     //boolean delete = true;
     long topN = Long.MAX_VALUE;
-    
+
     if (args.length == 0) {
-      System.err.println("Usage: Benchmark [-seeds NN] [-depth NN] [-threads 
NN] [-maxPerHost NN] [-plugins <regex>]");
+      System.err.println("Usage: Benchmark [-crawlId <id>] [-seeds NN] [-depth 
NN] [-threads NN] [-maxPerHost NN] [-plugins <regex>]");
+      System.err.println("\t-crawlId id\t the id to prefix the schemas to 
operate on, (default: storage.crawl.id)");
       System.err.println("\t-seeds NN\tcreate NN unique hosts in a seed list 
(default: 1)");
       System.err.println("\t-depth NN\tperform NN crawl cycles (default: 10)");
       System.err.println("\t-threads NN\tuse NN threads per Fetcher task 
(default: 10)");
@@ -121,7 +124,9 @@ public class Benchmark extends Configure
     }
     int maxPerHost = Integer.MAX_VALUE;
     for (int i = 0; i < args.length; i++) {
-      if (args[i].equals("-seeds")) {
+      if (args[i].equals("-crawlId")) {
+        getConf().set(Nutch.CRAWL_ID_KEY, args[++i]);
+      } else if (args[i].equals("-seeds")) {
         seeds = Integer.parseInt(args[++i]);
       } else if (args[i].equals("-threads")) {
         threads = Integer.parseInt(args[++i]);
@@ -140,7 +145,7 @@ public class Benchmark extends Configure
     System.out.println(res);
     return 0;
   }
-  
+
   public BenchmarkResults benchmark(int seeds, int depth, int threads, int 
maxPerHost,
         long topN, String plugins) throws Exception {
     Configuration conf = getConf();
@@ -153,7 +158,7 @@ public class Benchmark extends Configure
     }
     conf.setInt(GeneratorJob.GENERATOR_MAX_COUNT, maxPerHost);
     conf.set(GeneratorJob.GENERATOR_COUNT_MODE, 
GeneratorJob.GENERATOR_COUNT_VALUE_HOST);
-    Job job = new NutchJob(conf);    
+    Job job = new NutchJob(conf);
     FileSystem fs = FileSystem.get(job.getConfiguration());
     Path dir = new Path(getConf().get("hadoop.tmp.dir"),
             "bench-" + System.currentTimeMillis());
@@ -166,16 +171,16 @@ public class Benchmark extends Configure
       LOG.info("crawl started in: " + dir);
       LOG.info("rootUrlDir = " + rootUrlDir);
       LOG.info("threads = " + threads);
-      LOG.info("depth = " + depth);      
+      LOG.info("depth = " + depth);
     }
-    
+
     BenchmarkResults res = new BenchmarkResults();
     res.depth = depth;
     res.plugins = plugins;
     res.seeds = seeds;
     res.threads = threads;
     res.topN = topN;
-    
+
     res.elapsed = System.currentTimeMillis();
     InjectorJob injector = new InjectorJob(conf);
     GeneratorJob generator = new GeneratorJob(conf);
@@ -184,7 +189,7 @@ public class Benchmark extends Configure
     DbUpdaterJob crawlDbTool = new DbUpdaterJob(conf);
     // not needed in the new API
     //LinkDb linkDbTool = new LinkDb(getConf());
-    
+
     long start = System.currentTimeMillis();
     // initialize crawlDb
     injector.inject(rootUrlDir);
@@ -193,22 +198,22 @@ public class Benchmark extends Configure
     int i;
     for (i = 0; i < depth; i++) {             // generate new segment
       start = System.currentTimeMillis();
-      String crawlId = generator.generate(topN, System.currentTimeMillis(),
+      String batchId = generator.generate(topN, System.currentTimeMillis(),
               false, false);
       delta = System.currentTimeMillis() - start;
       res.addTiming("generate", i + "", delta);
-      if (crawlId == null) {
+      if (batchId == null) {
         LOG.info("Stopping at depth=" + i + " - no more URLs to fetch.");
         break;
       }
       boolean isParsing = getConf().getBoolean("fetcher.parse", true);
       start = System.currentTimeMillis();
-      fetcher.fetch(crawlId, threads, false, isParsing, -1);  // fetch it
+      fetcher.fetch(batchId, threads, false, isParsing, -1);  // fetch it
       delta = System.currentTimeMillis() - start;
       res.addTiming("fetch", i + "", delta);
       if (!isParsing) {
         start = System.currentTimeMillis();
-        parseSegment.parse(crawlId, false, false);    // parse it, if needed
+        parseSegment.parse(batchId, false, false);    // parse it, if needed
         delta = System.currentTimeMillis() - start;
         res.addTiming("parse", i + "", delta);
       }

Modified: nutch/trunk/src/java/org/apache/nutch/util/NutchJob.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/util/NutchJob.java?rev=1025963&r1=1025962&r2=1025963&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/util/NutchJob.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/util/NutchJob.java Thu Oct 21 
12:01:36 2010
@@ -37,12 +37,12 @@ public class NutchJob extends Job {
     setJarByClass(this.getClass());
   }
 
-  public static boolean shouldProcess(Utf8 mark, Utf8 crawlId) {
+  public static boolean shouldProcess(Utf8 mark, Utf8 batchId) {
     if (mark == null) {
       return false;
     }
-    boolean isAll = crawlId.equals(Nutch.ALL_CRAWL_ID);
-    if (!isAll && !mark.equals(crawlId)) {
+    boolean isAll = batchId.equals(Nutch.ALL_CRAWL_ID);
+    if (!isAll && !mark.equals(batchId)) {
       return false;
     }
     return true;

Modified: nutch/trunk/src/test/org/apache/nutch/crawl/TestGenerator.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/test/org/apache/nutch/crawl/TestGenerator.java?rev=1025963&r1=1025962&r2=1025963&view=diff
==============================================================================
--- nutch/trunk/src/test/org/apache/nutch/crawl/TestGenerator.java (original)
+++ nutch/trunk/src/test/org/apache/nutch/crawl/TestGenerator.java Thu Oct 21 
12:01:36 2010
@@ -41,7 +41,7 @@ import org.apache.nutch.util.TableUtil;
 public class TestGenerator extends AbstractNutchTest {
 
   public static final Logger LOG = 
LoggerFactory.getLogger(TestGenerator.class);
-  
+
   private static String[] FIELDS = new String[] {
     WebPage.Field.MARKERS.getName(),
     WebPage.Field.SCORE.getName()
@@ -172,7 +172,7 @@ public class TestGenerator extends Abstr
       webPageStore.put(TableUtil.reverseUrl(uwp.getUrl()), uwp.getDatum());
     }
     webPageStore.flush();
-    
+
     Configuration myConfiguration = new Configuration(conf);
     myConfiguration.setInt(GeneratorJob.GENERATOR_MAX_COUNT, 1);
     myConfiguration.set(GeneratorJob.GENERATOR_COUNT_MODE, 
GeneratorJob.GENERATOR_COUNT_VALUE_DOMAIN);
@@ -255,8 +255,8 @@ public class TestGenerator extends Abstr
     // generate segment
     GeneratorJob g = new GeneratorJob();
     g.setConf(config);
-    String crawlId = g.generate(numResults, System.currentTimeMillis(), 
filter, false);
-    if (crawlId == null)
+    String batchId = g.generate(numResults, System.currentTimeMillis(), 
filter, false);
+    if (batchId == null)
       throw new RuntimeException("Generator failed");
   }
 

Modified: nutch/trunk/src/test/org/apache/nutch/crawl/TestInjector.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/test/org/apache/nutch/crawl/TestInjector.java?rev=1025963&r1=1025962&r2=1025963&view=diff
==============================================================================
--- nutch/trunk/src/test/org/apache/nutch/crawl/TestInjector.java (original)
+++ nutch/trunk/src/test/org/apache/nutch/crawl/TestInjector.java Thu Oct 21 
12:01:36 2010
@@ -44,7 +44,7 @@ import org.junit.Before;
  * Basic injector test: 1. Creates a text file with urls 2. Injects them into
  * crawldb 3. Reads crawldb entries and verifies contents 4. Injects more urls
  * into webdb 5. Reads crawldb entries and verifies contents
- * 
+ *
  * @author nutch-dev <nutch-dev at lucene.apache.org>
  */
 public class TestInjector extends AbstractNutchTest {
@@ -104,13 +104,13 @@ public class TestInjector extends Abstra
     assertTrue(urls.containsAll(read));
 
   }
-  
+
   private static final String[] fields = new String[] {
     WebPage.Field.MARKERS.getName(),
     WebPage.Field.METADATA.getName(),
     WebPage.Field.SCORE.getName()
   };
-  
+
   private List<String> readDb() throws Exception {
     List<URLWebPage> pages = CrawlTestUtil.readContents(webPageStore, null, 
fields);
     ArrayList<String> read = new ArrayList<String>();

Modified: nutch/trunk/src/test/org/apache/nutch/fetcher/TestFetcher.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/test/org/apache/nutch/fetcher/TestFetcher.java?rev=1025963&r1=1025962&r2=1025963&view=diff
==============================================================================
--- nutch/trunk/src/test/org/apache/nutch/fetcher/TestFetcher.java (original)
+++ nutch/trunk/src/test/org/apache/nutch/fetcher/TestFetcher.java Thu Oct 21 
12:01:36 2010
@@ -46,6 +46,7 @@ public class TestFetcher extends Abstrac
   Path urlPath;
   Server server;
 
+  @Override
   public void setUp() throws Exception{
     super.setUp();
     urlPath = new Path(testdir, "urls");
@@ -53,25 +54,26 @@ public class TestFetcher extends Abstrac
     server.start();
   }
 
+  @Override
   public void tearDown() throws Exception{
     server.stop();
     fs.delete(testdir, true);
   }
-  
+
   public void testFetch() throws Exception {
-    
+
     //generate seedlist
     ArrayList<String> urls = new ArrayList<String>();
-    
+
     addUrl(urls,"index.html");
     addUrl(urls,"pagea.html");
     addUrl(urls,"pageb.html");
     addUrl(urls,"dup_of_pagea.html");
     addUrl(urls,"nested_spider_trap.html");
     addUrl(urls,"exception.html");
-    
+
     CrawlTestUtil.generateSeedList(fs, urlPath, urls);
-    
+
     //inject
     InjectorJob injector = new InjectorJob(conf);
     injector.inject(urlPath);
@@ -79,21 +81,21 @@ public class TestFetcher extends Abstrac
     //generate
     long time = System.currentTimeMillis();
     GeneratorJob g = new GeneratorJob(conf);
-    String crawlId = g.generate(Long.MAX_VALUE, time, false, false);
+    String batchId = g.generate(Long.MAX_VALUE, time, false, false);
 
     //fetch
     time = System.currentTimeMillis();
     conf.setBoolean(FetcherJob.PARSE_KEY, true);
     FetcherJob fetcher = new FetcherJob(conf);
-    fetcher.fetch(crawlId, 1, false, true, -1);
+    fetcher.fetch(batchId, 1, false, true, -1);
 
     time = System.currentTimeMillis() - time;
-    
+
     //verify politeness, time taken should be more than (num_of_pages +1)*delay
     int minimumTime = (int) ((urls.size() + 1) * 1000 *
         conf.getFloat("fetcher.server.delay", 5));
     assertTrue(time > minimumTime);
-    
+
     List<URLWebPage> pages = CrawlTestUtil.readContents(webPageStore, 
Mark.FETCH_MARK, (String[])null);
     assertEquals(urls.size(), pages.size());
     List<String> handledurls = new ArrayList<String>();
@@ -104,7 +106,7 @@ public class TestFetcher extends Abstrac
       }
       String content = new String(bb.array());
       if (content.indexOf("Nutch fetcher test page")!=-1) {
-        handledurls.add(up.getUrl());        
+        handledurls.add(up.getUrl());
       }
     }
     Collections.sort(urls);
@@ -121,7 +123,7 @@ public class TestFetcher extends Abstrac
   private void addUrl(ArrayList<String> urls, String page) {
     urls.add("http://127.0.0.1:"; + server.getConnectors()[0].getPort() + "/" + 
page);
   }
-  
+
   public void testAgentNameCheck() {
 
     boolean failedNoAgentName = false;

Modified: nutch/trunk/src/test/org/apache/nutch/util/AbstractNutchTest.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/test/org/apache/nutch/util/AbstractNutchTest.java?rev=1025963&r1=1025962&r2=1025963&view=diff
==============================================================================
--- nutch/trunk/src/test/org/apache/nutch/util/AbstractNutchTest.java (original)
+++ nutch/trunk/src/test/org/apache/nutch/util/AbstractNutchTest.java Thu Oct 
21 12:01:36 2010
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.nutch.crawl.URLWebPage;
 import org.apache.nutch.storage.Mark;
+import org.apache.nutch.storage.StorageUtils;
 import org.apache.nutch.storage.WebPage;
 import org.apache.nutch.util.TableUtil;
 import org.gora.query.Query;
@@ -49,11 +50,12 @@ public class AbstractNutchTest extends T
   protected Path testdir = new Path("build/test/inject-test");
   protected DataStore<String, WebPage> webPageStore;
   protected boolean persistentDataStore = false;
-  
+
   @Override
   public void setUp() throws Exception {
     super.setUp();
     conf = CrawlTestUtil.createConfiguration();
+    conf.set("storage.data.store.class", "org.gora.sql.store.SqlStore");
     fs = FileSystem.get(conf);
     // using hsqldb in memory
     
DataStoreFactory.properties.setProperty("gora.sqlstore.jdbc.driver","org.hsqldb.jdbcDriver");
@@ -61,13 +63,8 @@ public class AbstractNutchTest extends T
     
DataStoreFactory.properties.setProperty("gora.sqlstore.jdbc.url","jdbc:hsqldb:mem:"
 + getClass().getName());
     DataStoreFactory.properties.setProperty("gora.sqlstore.jdbc.user","sa");
     DataStoreFactory.properties.setProperty("gora.sqlstore.jdbc.password","");
-    if (persistentDataStore) {
-      webPageStore = DataStoreFactory.getDataStore(SqlStore.class,
-          String.class, WebPage.class);      
-    } else {
-      webPageStore = DataStoreFactory.createDataStore(SqlStore.class,
-          String.class, WebPage.class);
-    }
+    webPageStore = StorageUtils.createWebStore(conf, String.class,
+        WebPage.class);
   }
 
   @Override


Reply via email to