Author: jnioche
Date: Tue Jul 15 09:16:47 2014
New Revision: 1610628
URL: http://svn.apache.org/r1610628
Log:
NUTCH-1502 Test for CrawlDatum state transitions (snagel)
Added:
nutch/trunk/src/test/org/apache/nutch/crawl/ContinuousCrawlTestUtil.java
nutch/trunk/src/test/org/apache/nutch/crawl/CrawlDbUpdateUtil.java
nutch/trunk/src/test/org/apache/nutch/crawl/TODOTestCrawlDbStates.java
nutch/trunk/src/test/org/apache/nutch/crawl/TestCrawlDbStates.java
Modified:
nutch/trunk/CHANGES.txt
Modified: nutch/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1610628&r1=1610627&r2=1610628&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Tue Jul 15 09:16:47 2014
@@ -2,6 +2,8 @@ Nutch Change Log
Nutch Current Development
+* NUTCH-1502 Test for CrawlDatum state transitions (snagel)
+
* NUTCH-1804 Move JUnit dependency to test scope (jnioche)
* NUTCH-1811 bin/nutch junit to use junit 4 test runner (snagel)
Added: nutch/trunk/src/test/org/apache/nutch/crawl/ContinuousCrawlTestUtil.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/test/org/apache/nutch/crawl/ContinuousCrawlTestUtil.java?rev=1610628&view=auto
==============================================================================
--- nutch/trunk/src/test/org/apache/nutch/crawl/ContinuousCrawlTestUtil.java
(added)
+++ nutch/trunk/src/test/org/apache/nutch/crawl/ContinuousCrawlTestUtil.java
Tue Jul 15 09:16:47 2014
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.nutch.crawl.CrawlDbUpdateUtil;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.util.TimingUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Emulate a continuous crawl for one URL.
+ *
+ */
+public class ContinuousCrawlTestUtil extends TestCase {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ContinuousCrawlTestUtil.class);
+
+ protected static Text dummyURL = new Text("http://nutch.apache.org/");
+
+ protected static Configuration defaultConfig = CrawlDBTestUtil
+ .createConfiguration();
+
+ protected long interval = FetchSchedule.SECONDS_PER_DAY*1000; // (default)
launch crawler every day
+ protected long duration = 2*365L*FetchSchedule.SECONDS_PER_DAY*1000L; // run
for two years
+
+ protected Configuration configuration;
+ private FetchSchedule schedule;
+
+ /** status a fetched datum should get */
+ protected byte fetchStatus = CrawlDatum.STATUS_FETCH_SUCCESS;
+ /** expected status of the resulting Db datum */
+ protected byte expectedDbStatus = CrawlDatum.STATUS_DB_FETCHED;
+
+ /** for signature calculation */
+ protected Signature signatureImpl;
+ protected Content content = new Content();
+
+ {
+ byte[] data = {'n', 'u', 't', 'c', 'h'};
+ content.setContent(data);
+ }
+
+ protected ContinuousCrawlTestUtil(Configuration conf) {
+ configuration = conf;
+ schedule = FetchScheduleFactory.getFetchSchedule(new JobConf(conf));
+ signatureImpl = SignatureFactory.getSignature(conf);
+ }
+
+ protected ContinuousCrawlTestUtil(Configuration conf, byte fetchStatus,
+ byte expectedDbStatus) {
+ this(conf);
+ this.fetchStatus = fetchStatus;
+ this.expectedDbStatus = expectedDbStatus;
+ }
+
+ protected ContinuousCrawlTestUtil() {
+ this(defaultConfig);
+ }
+
+ protected ContinuousCrawlTestUtil(byte fetchStatus, byte expectedDbStatus) {
+ this(defaultConfig, fetchStatus, expectedDbStatus);
+ }
+
+ /** set the interval the crawl is relaunched (default: every day) */
+ protected void setInterval(int seconds) {
+ interval = seconds*1000L;
+ }
+
+ /** set the duration of the continuous crawl (default = 2 years) */
+ protected void setDuraction(int seconds) {
+ duration = seconds*1000L;
+ }
+
+ /**
+ * default fetch action: set status and time
+ *
+ * @param datum
+ * CrawlDatum to fetch
+ * @param currentTime
+ * current time used to set the fetch time via
+ * {@link CrawlDatum#setFetchTime(long)}
+ * @return the modified CrawlDatum
+ */
+ protected CrawlDatum fetch(CrawlDatum datum, long currentTime) {
+ datum.setStatus(fetchStatus);
+ datum.setFetchTime(currentTime);
+ return datum;
+ }
+
+ /**
+ * get signature for content and configured signature implementation
+ */
+ protected byte[] getSignature() {
+ return signatureImpl.calculate(content, null);
+ }
+
+ /**
+ * change content to force a changed signature
+ */
+ protected void changeContent() {
+ byte [] data = Arrays.copyOf(content.getContent(),
content.getContent().length+1);
+ data[content.getContent().length] = '2'; // append one byte
+ content.setContent(data);
+ LOG.info("document content changed");
+ }
+
+
+ /**
+ * default parse action: add signature if successfully fetched
+ *
+ * @param fetchDatum
+ * fetch datum
+ * @return list of all datums resulting from parse (status: signature,
linked, parse_metadata)
+ */
+ protected List<CrawlDatum> parse(CrawlDatum fetchDatum) {
+ List<CrawlDatum> parseDatums = new ArrayList<CrawlDatum>(0);
+ if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_SUCCESS) {
+ CrawlDatum signatureDatum = new CrawlDatum(CrawlDatum.STATUS_SIGNATURE,
0);
+ signatureDatum.setSignature(getSignature());
+ parseDatums.add(signatureDatum);
+ }
+ return parseDatums;
+ }
+
+ /**
+ * default implementation to check the result state
+ *
+ * @param datum
+ * the CrawlDatum to be checked
+ * @return true if the check succeeds
+ */
+ protected boolean check(CrawlDatum datum) {
+ if (datum.getStatus() != expectedDbStatus)
+ return false;
+ return true;
+ }
+
+ /**
+ * Run the continuous crawl.
+ * <p>
+ * A loop emulates a continuous crawl launched in regular intervals (see
+ * {@link #setInterval(int)} over a longer period ({@link
#setDuraction(int)}.
+ *
+ * <ul>
+ * <li>every "round" emulates
+ * <ul>
+ * <li>a fetch (see {@link #fetch(CrawlDatum, long)})</li>
+ * <li>{@literal updatedb} which returns a {@link CrawlDatum}</li>
+ * </ul>
+ * <li>the returned CrawlDatum is used as input for the next round</li>
+ * <li>and is checked whether it is correct (see {@link #check(CrawlDatum)})
+ * </ul>
+ * </p>
+ *
+ * @param maxErrors
+ * (if > 0) continue crawl even if the checked CrawlDatum is not
+ * correct, but stop after max. number of errors
+ *
+ * @return false if a check of CrawlDatum failed, true otherwise
+ */
+ protected boolean run(int maxErrors) {
+
+ long now = System.currentTimeMillis();
+
+ CrawlDbUpdateUtil<CrawlDbReducer> updateDb = new
CrawlDbUpdateUtil<CrawlDbReducer>(
+ new CrawlDbReducer(), configuration);
+
+ /* start with a db_unfetched */
+ CrawlDatum dbDatum = new CrawlDatum();
+ dbDatum.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
+ schedule.initializeSchedule(dummyURL, dbDatum); // initialize fetchInterval
+ dbDatum.setFetchTime(now);
+
+ LOG.info("Emulate a continuous crawl, launched every "
+ + (interval / (FetchSchedule.SECONDS_PER_DAY * 1000)) + " day ("
+ + (interval / 1000) + " seconds)");
+ long maxTime = (now + duration);
+ long nextTime = now;
+ long lastFetchTime = -1;
+ boolean ok = true; // record failure but keep going
+ CrawlDatum fetchDatum = new CrawlDatum();
+ /* Keep copies because CrawlDbReducer.reduce()
+ * and FetchSchedule.shouldFetch() may alter the references.
+ * Copies are used for verbose logging in case of an error. */
+ CrawlDatum copyDbDatum = new CrawlDatum();
+ CrawlDatum copyFetchDatum = new CrawlDatum();
+ CrawlDatum afterShouldFetch = new CrawlDatum();
+ int errorCount = 0;
+ while (nextTime < maxTime) {
+ LOG.info("check: " + new Date(nextTime));
+ fetchDatum.set(dbDatum);
+ copyDbDatum.set(dbDatum);
+ if (schedule.shouldFetch(dummyURL, fetchDatum, nextTime)) {
+ LOG.info("... fetching now (" + new Date(nextTime) + ")");
+ if (lastFetchTime > -1) {
+ LOG.info("(last fetch: " + new Date(lastFetchTime) + " = "
+ + TimingUtil.elapsedTime(lastFetchTime, nextTime) + " ago)");
+ }
+ lastFetchTime = nextTime;
+ afterShouldFetch.set(fetchDatum);
+ fetchDatum = fetch(fetchDatum, nextTime);
+ copyFetchDatum.set(fetchDatum);
+ List<CrawlDatum> values = new ArrayList<CrawlDatum>();
+ values.add(dbDatum);
+ values.add(fetchDatum);
+ values.addAll(parse(fetchDatum));
+ List<CrawlDatum> res = updateDb.update(values);
+ assertNotNull("null returned", res);
+ assertFalse("no CrawlDatum", 0 == res.size());
+ assertEquals("more than one CrawlDatum", 1, res.size());
+ if (!check(res.get(0))) {
+ LOG.info("previously in CrawlDb: " + copyDbDatum);
+ LOG.info("after shouldFetch(): " + afterShouldFetch);
+ LOG.info("fetch: " + fetchDatum);
+ LOG.warn("wrong result in CrawlDb: " + res.get(0));
+ if (++errorCount >= maxErrors) {
+ if (maxErrors > 0) {
+ LOG.error("Max. number of errors " + maxErrors
+ + " reached. Stopping.");
+ }
+ return false;
+ } else {
+ ok = false; // record failure but keep going
+ }
+ }
+ /* use the returned CrawlDatum for the next fetch */
+ dbDatum = res.get(0);
+ }
+ nextTime += interval;
+ }
+ return ok;
+ }
+
+}
Added: nutch/trunk/src/test/org/apache/nutch/crawl/CrawlDbUpdateUtil.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/test/org/apache/nutch/crawl/CrawlDbUpdateUtil.java?rev=1610628&view=auto
==============================================================================
--- nutch/trunk/src/test/org/apache/nutch/crawl/CrawlDbUpdateUtil.java (added)
+++ nutch/trunk/src/test/org/apache/nutch/crawl/CrawlDbUpdateUtil.java Tue Jul
15 09:16:47 2014
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility to test transitions of {@link CrawlDatum} states during an update of
+ * {@link CrawlDb} (command {@literal updatedb}): call
+ * {@link CrawlDbReducer#reduce(Text, Iterator, OutputCollector, Reporter)}
with
+ * the old CrawlDatum (db status) and the new one (fetch status)
+ */
+public class CrawlDbUpdateUtil<T extends Reducer<Text, CrawlDatum, Text,
CrawlDatum>> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CrawlDbUpdateUtil.class);
+
+ private T reducer;
+
+ public static Text dummyURL = new Text("http://nutch.apache.org/");
+
+ protected CrawlDbUpdateUtil(T red, Configuration conf) {
+ reducer = red;
+ reducer.configure(new JobConf(conf));
+ }
+
+ /** {@link OutputCollector} to collect all values in a {@link List} */
+ private class ListOutputCollector implements
+ OutputCollector<Text, CrawlDatum> {
+
+ private List<CrawlDatum> values = new ArrayList<CrawlDatum>();
+
+ public void collect(Text key, CrawlDatum value) throws IOException {
+ values.add(value);
+ }
+
+ /** collected values as list */
+ public List<CrawlDatum> getValues() {
+ return values;
+ }
+
+ }
+
+ /**
+ * Dummy reporter which does nothing and does not return null for
+ * getCounter()
+ *
+ * @see {@link Reporter#NULL}
+ */
+ private class DummyReporter implements Reporter {
+
+ private Counters dummyCounters = new Counters();
+
+ public void progress() {
+ }
+
+ public Counter getCounter(Enum<?> arg0) {
+ return dummyCounters.getGroup("dummy").getCounterForName("dummy");
+ }
+
+ public Counter getCounter(String arg0, String arg1) {
+ return dummyCounters.getGroup("dummy").getCounterForName("dummy");
+ }
+
+ public InputSplit getInputSplit() throws UnsupportedOperationException {
+ throw new UnsupportedOperationException("Dummy reporter without input");
+ }
+
+ public void incrCounter(Enum<?> arg0, long arg1) {
+ }
+
+ public void incrCounter(String arg0, String arg1, long arg2) {
+ }
+
+ public void setStatus(String arg0) {
+ }
+
+ public float getProgress() {
+ return 1f;
+ }
+
+ }
+
+ /**
+ * run
+ * {@link CrawlDbReducer#reduce(Text, Iterator, OutputCollector, Reporter)}
+ * and return the CrawlDatum(s) which would have been written into CrawlDb
+ * @param values list of input CrawlDatums
+ * @return list of resulting CrawlDatum(s) in CrawlDb
+ */
+ public List<CrawlDatum> update(List<CrawlDatum> values) {
+ if (values == null || values.size() == 0) {
+ return new ArrayList<CrawlDatum>(0);
+ }
+ Collections.shuffle(values); // sorting of values should have no influence
+ ListOutputCollector output = new ListOutputCollector();
+ try {
+ reducer.reduce(dummyURL, values.iterator(), output, new DummyReporter());
+ } catch (IOException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ }
+ return output.getValues();
+ }
+
+ /**
+ * run
+ * {@link CrawlDbReducer#reduce(Text, Iterator, OutputCollector, Reporter)}
+ * and return the CrawlDatum(s) which would have been written into CrawlDb
+ * @param dbDatum previous CrawlDatum in CrawlDb
+ * @param fetchDatum CrawlDatum resulting from fetching
+ * @return list of resulting CrawlDatum(s) in CrawlDb
+ */
+ public List<CrawlDatum> update(CrawlDatum dbDatum,
+ CrawlDatum fetchDatum) {
+ List<CrawlDatum> values = new ArrayList<CrawlDatum>();
+ if (dbDatum != null)
+ values.add(dbDatum);
+ if (fetchDatum != null)
+ values.add(fetchDatum);
+ return update(values);
+ }
+
+ /**
+ * see {@link #update(List)}
+ */
+ public List<CrawlDatum> update(CrawlDatum... values) {
+ return update(Arrays.asList(values));
+ }
+
+}
Added: nutch/trunk/src/test/org/apache/nutch/crawl/TODOTestCrawlDbStates.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/test/org/apache/nutch/crawl/TODOTestCrawlDbStates.java?rev=1610628&view=auto
==============================================================================
--- nutch/trunk/src/test/org/apache/nutch/crawl/TODOTestCrawlDbStates.java
(added)
+++ nutch/trunk/src/test/org/apache/nutch/crawl/TODOTestCrawlDbStates.java Tue
Jul 15 09:16:47 2014
@@ -0,0 +1,242 @@
+package org.apache.nutch.crawl;
+
+import static org.apache.nutch.crawl.CrawlDatum.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.util.TimingUtil;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TODOTestCrawlDbStates extends TestCrawlDbStates {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TODOTestCrawlDbStates.class);
+
+ /**
+ * NUTCH-578: a fetch_retry should result in a db_gone if db.fetch.retry.max
is reached.
+ * Retry counter has to be reset appropriately.
+ */
+ @Test
+ public void testCrawlDbReducerPageRetrySchedule() {
+ LOG.info("NUTCH-578: test long running continuous crawl with fetch_retry");
+ ContinuousCrawlTestUtil crawlUtil = new ContinuousCrawlTestFetchRetry();
+ // keep going for long, to "provoke" a retry counter overflow
+ if (!crawlUtil.run(150)) {
+ fail("fetch_retry did not result in a db_gone if retry counter >
maxRetries (NUTCH-578)");
+ }
+ }
+
+ private class ContinuousCrawlTestFetchRetry extends ContinuousCrawlTestUtil {
+
+ private int retryMax = 3;
+ private int totalRetries = 0;
+
+ ContinuousCrawlTestFetchRetry() {
+ super();
+ fetchStatus = STATUS_FETCH_RETRY;
+ retryMax = configuration.getInt("db.fetch.retry.max", retryMax);
+ }
+
+ @Override
+ protected CrawlDatum fetch(CrawlDatum datum, long currentTime) {
+ datum.setStatus(fetchStatus);
+ datum.setFetchTime(currentTime);
+ totalRetries++;
+ return datum;
+ }
+
+ @Override
+ protected boolean check(CrawlDatum result) {
+ if (result.getRetriesSinceFetch() > retryMax) {
+ LOG.warn("Retry counter > db.fetch.retry.max: " + result);
+ } else if (result.getRetriesSinceFetch() == Byte.MAX_VALUE) {
+ LOG.warn("Retry counter max. value reached (overflow imminent): "
+ + result);
+ } else if (result.getRetriesSinceFetch() < 0) {
+ LOG.error("Retry counter overflow: " + result);
+ return false;
+ }
+ // use retry counter bound to this class (totalRetries)
+ // instead of result.getRetriesSinceFetch() because the retry counter
+ // in CrawlDatum could be reset (eg. NUTCH-578_v5.patch)
+ if (totalRetries < retryMax) {
+ if (result.getStatus() == STATUS_DB_UNFETCHED) {
+ LOG.info("ok: " + result);
+ result.getRetriesSinceFetch();
+ return true;
+ }
+ } else {
+ if (result.getStatus() == STATUS_DB_GONE) {
+ LOG.info("ok: " + result);
+ return true;
+ }
+ }
+ LOG.warn("wrong: " + result);
+ return false;
+ }
+
+ }
+
+ /**
+ * NUTCH-1564 AdaptiveFetchSchedule: sync_delta forces immediate re-fetch for
+ * documents not modified
+ * <p>
+ * Problem: documents not modified for a longer time are fetched in every
+ * cycle because of an error in the SYNC_DELTA calculation of
+ * {@link AdaptiveFetchSchedule}.
+ * <br>
+ * The next fetch time should always be in the future, never in the past.
+ * </p>
+ */
+ @Test
+ public void testAdaptiveFetchScheduleSyncDelta() {
+ LOG.info("NUTCH-1564 test SYNC_DELTA calculation of
AdaptiveFetchSchedule");
+ Configuration conf = CrawlDBTestUtil.createConfiguration();
+ conf.setLong("db.fetch.interval.default", 172800); // 2 days
+ conf.setLong("db.fetch.schedule.adaptive.min_interval", 86400); // 1 day
+ conf.setLong("db.fetch.schedule.adaptive.max_interval", 604800); // 7 days
+ conf.setLong("db.fetch.interval.max", 604800); // 7 days
+ conf.set("db.fetch.schedule.class",
+ "org.apache.nutch.crawl.AdaptiveFetchSchedule");
+ ContinuousCrawlTestUtil crawlUtil = new
CrawlTestFetchScheduleNotModifiedFetchTime(
+ conf);
+ crawlUtil.setInterval(FetchSchedule.SECONDS_PER_DAY/3);
+ if (!crawlUtil.run(100)) {
+ fail("failed: sync_delta calculation with AdaptiveFetchSchedule");
+ }
+ }
+
+ private class CrawlTestFetchScheduleNotModifiedFetchTime extends
+ CrawlTestFetchNotModified {
+
+ // time of current fetch
+ private long fetchTime;
+
+ private long minInterval;
+ private long maxInterval;
+
+ CrawlTestFetchScheduleNotModifiedFetchTime(Configuration conf) {
+ super(conf);
+ minInterval = conf.getLong("db.fetch.schedule.adaptive.min_interval",
+ 86400); // 1 day
+ maxInterval = conf.getLong("db.fetch.schedule.adaptive.max_interval",
+ 604800); // 7 days
+ if (conf.getLong("db.fetch.interval.max", 604800) < maxInterval) {
+ maxInterval = conf.getLong("db.fetch.interval.max", 604800);
+ }
+ }
+
+ @Override
+ protected CrawlDatum fetch(CrawlDatum datum, long currentTime) {
+ // remember time of fetching
+ fetchTime = currentTime;
+ return super.fetch(datum, currentTime);
+ }
+
+ @Override
+ protected boolean check(CrawlDatum result) {
+ if (result.getStatus() == STATUS_DB_NOTMODIFIED) {
+ // check only status notmodified here
+ long secondsUntilNextFetch = (result.getFetchTime() - fetchTime) /
1000L;
+ if (secondsUntilNextFetch < -1) {
+ // next fetch time is in the past (more than one second)
+ LOG.error("Next fetch time is in the past: " + result);
+ return false;
+ }
+ if (secondsUntilNextFetch < 60) {
+ // next fetch time is in less than one minute
+ // (critical: Nutch can hardly be so fast)
+ LOG.error("Less then one minute until next fetch: " + result);
+ }
+ // Next fetch time should be within min. and max. (tolerance: 60 sec.)
+ if (secondsUntilNextFetch+60 < minInterval
+ || secondsUntilNextFetch-60 > maxInterval) {
+ LOG.error("Interval until next fetch time ("
+ + TimingUtil.elapsedTime(fetchTime, result.getFetchTime())
+ + ") is not within min. and max. interval: " + result);
+ // TODO: is this a failure?
+ }
+ }
+ return true;
+ }
+
+ }
+
+ /**
+ * Test whether signatures are reset for "content-less" states
+ * (gone, redirect, etc.): otherwise, if this state is temporary
+ * and the document appears again with the old content, it may
+ * get marked as not_modified in CrawlDb just after the redirect
+ * state. In this case we cannot expect content in segments.
+ * Cf. NUTCH-1422: reset signature for redirects.
+ */
+ // TODO: can only test if solution is done in CrawlDbReducer
+ @Test
+ public void testSignatureReset() {
+ LOG.info("NUTCH-1422 must reset signature for redirects and similar
states");
+ Configuration conf = CrawlDBTestUtil.createConfiguration();
+ for (String sched : schedules) {
+ LOG.info("Testing reset signature with " + sched);
+ conf.set("db.fetch.schedule.class", "org.apache.nutch.crawl."+sched);
+ ContinuousCrawlTestUtil crawlUtil = new CrawlTestSignatureReset(conf);
+ if (!crawlUtil.run(20)) {
+ fail("failed: signature not reset");
+ }
+ }
+ }
+
+ private class CrawlTestSignatureReset extends ContinuousCrawlTestUtil {
+
+ byte[][] noContentStates = {
+ { STATUS_FETCH_GONE, STATUS_DB_GONE },
+ { STATUS_FETCH_REDIR_TEMP, STATUS_DB_REDIR_TEMP },
+ { STATUS_FETCH_REDIR_PERM, STATUS_DB_REDIR_PERM } };
+
+ int counter = 0;
+ byte fetchState;
+
+ public CrawlTestSignatureReset(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ protected CrawlDatum fetch(CrawlDatum datum, long currentTime) {
+ datum = super.fetch(datum, currentTime);
+ counter++;
+ // flip-flopping between successful fetch and one of content-less states
+ if (counter%2 == 1) {
+ fetchState = STATUS_FETCH_SUCCESS;
+ } else {
+ fetchState = noContentStates[(counter%6)/2][0];
+ }
+ LOG.info("Step " + counter + ": fetched with "
+ + getStatusName(fetchState));
+ datum.setStatus(fetchState);
+ return datum;
+ }
+
+ @Override
+ protected boolean check(CrawlDatum result) {
+ if (result.getStatus() == STATUS_DB_NOTMODIFIED
+ && !(fetchState == STATUS_FETCH_SUCCESS || fetchState ==
STATUS_FETCH_NOTMODIFIED)) {
+ LOG.error("Should never get into state "
+ + getStatusName(STATUS_DB_NOTMODIFIED) + " from "
+ + getStatusName(fetchState));
+ return false;
+ }
+ if (result.getSignature() != null
+ && !(result.getStatus() == STATUS_DB_FETCHED || result.getStatus()
== STATUS_DB_NOTMODIFIED)) {
+ LOG.error("Signature not reset in state "
+ + getStatusName(result.getStatus()));
+ // ok here: since it's not the problem itself (the db_notmodified), but
+ // the reason for it
+ }
+ return true;
+ }
+
+ }
+
+}
Added: nutch/trunk/src/test/org/apache/nutch/crawl/TestCrawlDbStates.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/test/org/apache/nutch/crawl/TestCrawlDbStates.java?rev=1610628&view=auto
==============================================================================
--- nutch/trunk/src/test/org/apache/nutch/crawl/TestCrawlDbStates.java (added)
+++ nutch/trunk/src/test/org/apache/nutch/crawl/TestCrawlDbStates.java Tue Jul
15 09:16:47 2014
@@ -0,0 +1,486 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.StringUtils;
+
+import org.apache.nutch.crawl.CrawlDatum;
+import static org.apache.nutch.crawl.CrawlDatum.*;
+import org.apache.nutch.scoring.ScoringFilterException;
+import org.apache.nutch.scoring.ScoringFilters;
+
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test transitions of {@link CrawlDatum} states during an update of
+ * {@link CrawlDb} (command {@literal updatedb}):
+ * <ul>
+ * <li>simulate updatedb with the old CrawlDatum (db status) and the new one
+ * (fetch status) and test whether the resulting CrawlDatum has the appropriate
+ * status.</li>
+ * <li>also check for further CrawlDatum fields (signature, etc.)</li>
+ * <li>and additional conditions:</li>
+ * <ul>
+ * <li>retry counters</li>
+ * <li>signatures</li>
+ * <li>configuration properties</li>
+ * <li>(additional) CrawlDatums of status linked (stemming from inlinks)</li>
+ * </ul>
+ * </li>
+ * </ul>
+ */
+public class TestCrawlDbStates {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TestCrawlDbStates.class);
+
+ protected static final byte[][] fetchDbStatusPairs = {
+ { -1, STATUS_DB_UNFETCHED },
+ { STATUS_FETCH_SUCCESS, STATUS_DB_FETCHED },
+ { STATUS_FETCH_GONE, STATUS_DB_GONE },
+ { STATUS_FETCH_REDIR_TEMP, STATUS_DB_REDIR_TEMP },
+ { STATUS_FETCH_REDIR_PERM, STATUS_DB_REDIR_PERM },
+ { STATUS_FETCH_NOTMODIFIED, STATUS_DB_NOTMODIFIED },
+ { STATUS_FETCH_RETRY, -1 }, // fetch_retry does not have a
CrawlDb counter-part
+ { -1, STATUS_DB_DUPLICATE },
+ };
+
+ /** tested {@link FetchSchedule} implementations */
+ protected String[] schedules = {"DefaultFetchSchedule",
"AdaptiveFetchSchedule"};
+
+ /** CrawlDatum as result of a link */
+ protected final CrawlDatum linked = new CrawlDatum(STATUS_LINKED,
+ CrawlDBTestUtil.createConfiguration().getInt("db.fetch.interval.default",
+ 2592000), 0.1f);
+
+ /**
+ * Test the matrix of state transitions:
+ * <ul>
+ * <li>for all available {@link FetchSchedule} implementations</li>
+ * <li>for every possible status in CrawlDb (including "not in CrawlDb")</li>
+ * <li>for every possible fetch status</li>
+ * <li>and zero or more (0-3) additional in-links</li>
+ * </ul>
+ * call {@literal updatedb} and check whether the resulting CrawlDb status is
+ * the expected one.
+ */
+ @Test
+ public void testCrawlDbStateTransitionMatrix() {
+ LOG.info("Test CrawlDatum state transitions");
+ Configuration conf = CrawlDBTestUtil.createConfiguration();
+ CrawlDbUpdateUtil<CrawlDbReducer> updateDb = new
CrawlDbUpdateUtil<CrawlDbReducer>(
+ new CrawlDbReducer(), conf);
+ int retryMax = conf.getInt("db.fetch.retry.max", 3);
+ for (String sched : schedules) {
+ LOG.info("Testing state transitions with " + sched);
+ conf.set("db.fetch.schedule.class", "org.apache.nutch.crawl."+sched);
+ FetchSchedule schedule = FetchScheduleFactory
+ .getFetchSchedule(new JobConf(conf));
+ for (int i = 0; i < fetchDbStatusPairs.length; i++) {
+ byte fromDbStatus = fetchDbStatusPairs[i][1];
+ for (int j = 0; j < fetchDbStatusPairs.length; j++) {
+ byte fetchStatus = fetchDbStatusPairs[j][0];
+ CrawlDatum fromDb = null;
+ if (fromDbStatus == -1) {
+ // nothing yet in CrawlDb
+ // CrawlDatum added by FreeGenerator or via outlink
+ } else {
+ fromDb = new CrawlDatum();
+ fromDb.setStatus(fromDbStatus);
+ // initialize fetchInterval:
+ schedule.initializeSchedule(CrawlDbUpdateUtil.dummyURL, fromDb);
+ }
+ // expected db status
+ byte toDbStatus = fetchDbStatusPairs[j][1];
+ if (fetchStatus == -1) {
+ if (fromDbStatus == -1) {
+ // nothing fetched yet: new document detected via outlink
+ toDbStatus = STATUS_DB_UNFETCHED;
+ } else {
+ // nothing fetched but new inlinks detected: status is unchanged
+ toDbStatus = fromDbStatus;
+ }
+ } else if (fetchStatus == STATUS_FETCH_RETRY) {
+ // a simple test of fetch_retry (without retries)
+ if (fromDb == null || fromDb.getRetriesSinceFetch() < retryMax) {
+ toDbStatus = STATUS_DB_UNFETCHED;
+ } else {
+ toDbStatus = STATUS_DB_GONE;
+ }
+ }
+ String fromDbStatusName = (fromDbStatus == -1 ? "<not in CrawlDb>"
+ : getStatusName(fromDbStatus));
+ String fetchStatusName = (fetchStatus == -1 ? "<only inlinks>" :
CrawlDatum
+ .getStatusName(fetchStatus));
+ LOG.info(fromDbStatusName + " + " + fetchStatusName + " => "
+ + getStatusName(toDbStatus));
+ List<CrawlDatum> values = new ArrayList<CrawlDatum>();
+ for (int l = 0; l <= 2; l++) { // number of additional in-links
+ CrawlDatum fetch = null;
+ if (fetchStatus == -1) {
+ // nothing fetched, need at least one in-link
+ if (l == 0) continue;
+ } else {
+ fetch = new CrawlDatum();
+ if (fromDb != null) {
+ fetch.set(fromDb);
+ } else {
+ // not yet in CrawlDb: added by FreeGenerator
+ schedule.initializeSchedule(CrawlDbUpdateUtil.dummyURL, fetch);
+ }
+ fetch.setStatus(fetchStatus);
+ fetch.setFetchTime(System.currentTimeMillis());
+ }
+ if (fromDb != null)
+ values.add(fromDb);
+ if (fetch != null)
+ values.add(fetch);
+ for (int n = 0; n < l; n++) {
+ values.add(linked);
+ }
+ List<CrawlDatum> res = updateDb.update(values);
+ if (res.size() != 1) {
+ fail("CrawlDb update didn't result in one single CrawlDatum per
URL");
+ continue;
+ }
+ byte status = res.get(0).getStatus();
+ if (status != toDbStatus) {
+ fail("CrawlDb update for " + fromDbStatusName + " and "
+ + fetchStatusName + " and " + l + " inlinks results in "
+ + getStatusName(status) + " (expected: "
+ + getStatusName(toDbStatus) + ")");
+ }
+ values.clear();
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Test states after inject: inject must not modify the status of CrawlDatums
+ * already in CrawlDb. Newly injected elements have status "db_unfetched".
+ * Inject is simulated by calling {@link Injector.InjectReducer#reduce()}.
+ */
+ @Test
+ public void testCrawlDbStatTransitionInject() {
+ LOG.info("Test CrawlDatum states in Injector after inject");
+ Configuration conf = CrawlDBTestUtil.createConfiguration();
+ CrawlDbUpdateUtil<Injector.InjectReducer> inject = new
CrawlDbUpdateUtil<Injector.InjectReducer>(
+ new Injector.InjectReducer(), conf);
+ ScoringFilters scfilters = new ScoringFilters(conf);
+ for (String sched : schedules) {
+ LOG.info("Testing inject with " + sched);
+ conf.set("db.fetch.schedule.class", "org.apache.nutch.crawl."+sched);
+ FetchSchedule schedule = FetchScheduleFactory
+ .getFetchSchedule(new JobConf(conf));
+ List<CrawlDatum> values = new ArrayList<CrawlDatum>();
+ for (int i = 0; i < fetchDbStatusPairs.length; i++) {
+ byte fromDbStatus = fetchDbStatusPairs[i][1];
+ byte toDbStatus = fromDbStatus;
+ if (fromDbStatus == -1) {
+ toDbStatus = STATUS_DB_UNFETCHED;
+ } else {
+ CrawlDatum fromDb = new CrawlDatum();
+ fromDb.setStatus(fromDbStatus);
+ schedule.initializeSchedule(CrawlDbUpdateUtil.dummyURL, fromDb);
+ values.add(fromDb);
+ }
+ LOG.info("inject "
+ + (fromDbStatus == -1 ? "<not in CrawlDb>" : CrawlDatum
+ .getStatusName(fromDbStatus)) + " + "
+ + getStatusName(STATUS_INJECTED) + " => "
+ + getStatusName(toDbStatus));
+ CrawlDatum injected = new CrawlDatum(STATUS_INJECTED,
+ conf.getInt("db.fetch.interval.default", 2592000), 0.1f);
+ schedule.initializeSchedule(CrawlDbUpdateUtil.dummyURL, injected);
+ try {
+ scfilters.injectedScore(CrawlDbUpdateUtil.dummyURL, injected);
+ } catch (ScoringFilterException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ }
+ values.add(injected);
+ List<CrawlDatum> res = inject.update(values);
+ if (res.size() != 1) {
+ fail("Inject didn't result in one single CrawlDatum per URL");
+ continue;
+ }
+ byte status = res.get(0).getStatus();
+ if (status != toDbStatus) {
+ fail("Inject for "
+ + (fromDbStatus == -1 ? "" : getStatusName(fromDbStatus) + " and
")
+ + getStatusName(STATUS_INJECTED)
+ + " results in " + getStatusName(status)
+ + " (expected: " + getStatusName(toDbStatus) + ")");
+ }
+ values.clear();
+ }
+ }
+ }
+
+ /**
+ * Test status db_notmodified detected by
+ * <ul>
+ * <li>signature comparison</li>
+ * <li>or HTTP 304</li>
+ * </ul>
+ * In addition, test for all available {@link FetchSchedule} implementations
+ * whether
+ * <ul>
+ * <li>modified time is set</li>
+ * <li>re-fetch is triggered after a certain time to force the fetched
content
+ * to be in a recent segment (old segments are deleted, see comments in
+ * {@link CrawlDbReducer#reduce(Text, Iterator, OutputCollector, Reporter)}
+ * </li>
+ * </ul>
+ */
+ @Test
+ public void testCrawlDbReducerNotModified() {
+ LOG.info("Test state notmodified");
+ Configuration conf = CrawlDBTestUtil.createConfiguration();
+ // test not modified detected by signature comparison
+ for (String sched : schedules) {
+ String desc = "test notmodified by signature comparison + " + sched;
+ LOG.info(desc);
+ conf.set("db.fetch.schedule.class", "org.apache.nutch.crawl."+sched);
+ ContinuousCrawlTestUtil crawlUtil = new CrawlTestFetchNotModified(conf);
+ if (!crawlUtil.run(20)) {
+ fail("failed: " + desc);
+ }
+ }
+ // test not modified detected by HTTP 304
+ for (String sched : schedules) {
+ String desc = "test notmodified by HTTP 304 + " + sched;
+ LOG.info(desc);
+ conf.set("db.fetch.schedule.class", "org.apache.nutch.crawl."+sched);
+ ContinuousCrawlTestUtil crawlUtil = new
CrawlTestFetchNotModifiedHttp304(conf);
+ if (!crawlUtil.run(20)) {
+ fail("failed: " + desc);
+ }
+ }
+ }
+
+ protected class CrawlTestFetchNotModified extends ContinuousCrawlTestUtil {
+
+ /** time of the current fetch */
+ protected long currFetchTime;
+ /** time the last fetch took place */
+ protected long lastFetchTime;
+ /** time the document was fetched first (at all or after it has been
changed) */
+ protected long firstFetchTime;
+ /** state in CrawlDb before the last fetch */
+ protected byte previousDbState;
+ /** signature in CrawlDb of previous fetch */
+ protected byte[] lastSignature;
+
+ private long maxFetchInterval;
+ private FetchSchedule schedule;
+
+
+ CrawlTestFetchNotModified(Configuration conf) {
+ super(conf);
+ maxFetchInterval = conf.getLong("db.fetch.interval.max", 7776000); //
default = 90 days
+ maxFetchInterval += (24*60*60); //
but take one day more to avoid false alarms
+ maxFetchInterval *= 1000; // in
milli-seconds
+ schedule = FetchScheduleFactory.getFetchSchedule(new JobConf(conf));
+ }
+
+ @Override
+ protected boolean check(CrawlDatum result) {
+ if (lastFetchTime > 0 && (currFetchTime - lastFetchTime) >
maxFetchInterval) {
+ LOG.error("last effective fetch (HTTP 200, not HTTP 304), at "
+ + new Date(lastFetchTime)
+ + ", took place more than db.fetch.interval.max time, "
+ + "segment containing fetched content may have been deleted");
+ return false;
+ }
+ switch (result.getStatus()) {
+ case STATUS_DB_NOTMODIFIED:
+ // db_notmodified is correct if the document has been fetched
previously
+ // and it has not been changed since
+ if ((previousDbState == STATUS_DB_FETCHED || previousDbState ==
STATUS_DB_NOTMODIFIED)) {
+ if (lastSignature != null
+ && result.getSignature() != null
+ && SignatureComparator._compare(lastSignature,
+ result.getSignature()) != 0) {
+ LOG.error("document has changed (signature changed) but state is
still "
+ + getStatusName(STATUS_DB_NOTMODIFIED));
+ return false;
+ }
+ LOG.info("ok: " + result);
+ return checkModifiedTime(result, firstFetchTime);
+ }
+ LOG.warn("notmodified without previous fetch");
+ break;
+ case STATUS_DB_FETCHED:
+ if (previousDbState == STATUS_DB_UNFETCHED) {
+ LOG.info("ok (first fetch): " + result);
+ return checkModifiedTime(result, firstFetchTime);
+ } else if (lastSignature != null
+ && result.getSignature() != null
+ && SignatureComparator._compare(lastSignature,
+ result.getSignature()) != 0) {
+ LOG.info("ok (content changed): " + result);
+ // expect modified time == now
+ return checkModifiedTime(result, currFetchTime);
+ } else {
+ LOG.warn("document has not changed, db_notmodified expected");
+ }
+ break;
+ case STATUS_DB_UNFETCHED:
+ /**
+ * Status db_unfetched is possible with {@link AdaptiveFetchSchedule}
+ * because {@link CrawlDbReducer#reduce} calls
+ * {@link FetchSchedule#forceRefetch} to force a re-fetch if fetch
+ * interval grows too large.
+ */
+ if (schedule.getClass() == AdaptiveFetchSchedule.class) {
+ LOG.info("state set to unfetched by AdaptiveFetchSchedule");
+ if (result.getSignature() != null) {
+ LOG.warn("must reset signature: " + result);
+ return false;
+ }
+ LOG.info("ok: " + result);
+ firstFetchTime = 0;
+ return true;
+ }
+ }
+ LOG.warn("wrong result: " + result);
+ return false;
+ }
+
+
+ // test modified time
+ private boolean checkModifiedTime(CrawlDatum result, long modifiedTime) {
+ if (result.getModifiedTime() == 0) {
+ LOG.error("modified time not set (TODO: not set by
DefaultFetchSchedule)");
+ // TODO: return false (but DefaultFetchSchedule does not set modified
+ // time, see NUTCH-933)
+ return true;
+ } else if (modifiedTime == result.getModifiedTime()) {
+ return true;
+ }
+ LOG.error("wrong modified time: " + new Date(result.getModifiedTime())
+ + " (expected " + new Date(modifiedTime) + ")");
+ return false;
+ }
+
+ @Override
+ protected CrawlDatum fetch(CrawlDatum datum, long currentTime) {
+ lastFetchTime = currFetchTime;
+ currFetchTime = currentTime;
+ previousDbState = datum.getStatus();
+ lastSignature = datum.getSignature();
+ datum = super.fetch(datum, currentTime);
+ if (firstFetchTime == 0) {
+ firstFetchTime = currFetchTime;
+ } else if ((currFetchTime - firstFetchTime) > (duration/2)) {
+ // simulate a modification after "one year"
+ changeContent();
+ firstFetchTime = currFetchTime;
+ }
+ return datum;
+ }
+ }
+
+ protected class CrawlTestFetchNotModifiedHttp304 extends
CrawlTestFetchNotModified {
+
+ CrawlTestFetchNotModifiedHttp304(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ protected CrawlDatum fetch(CrawlDatum datum, long currentTime) {
+ lastFetchTime = currFetchTime;
+ currFetchTime = currentTime;
+ previousDbState = datum.getStatus();
+ lastSignature = datum.getSignature();
+ int httpCode;
+ /* document is "really" fetched (no HTTP 304)
+ * - if last-modified time or signature are unset
+ * (page has not been fetched before or fetch is forced)
+ * - for test purposes, we simulate a modified after "one year"
+ */
+ if (datum.getModifiedTime() == 0 && datum.getSignature() == null
+ || (currFetchTime - firstFetchTime) > (duration/2)) {
+ firstFetchTime = currFetchTime;
+ httpCode = 200;
+ datum.setStatus(STATUS_FETCH_SUCCESS);
+ // modify content to change signature
+ changeContent();
+ } else {
+ httpCode = 304;
+ datum.setStatus(STATUS_FETCH_NOTMODIFIED);
+ }
+ LOG.info("fetched with HTTP " + httpCode + " => "
+ + getStatusName(datum.getStatus()));
+ datum.setFetchTime(currentTime);
+ return datum;
+ }
+ }
+
+ /**
+ * NUTCH-1245: a fetch_gone should always result in a db_gone.
+ * <p>
+ * Even in a long-running continuous crawl, when a gone page is
+ * re-fetched several times over time.
+ * </p>
+ */
+ @Test
+ public void testCrawlDbReducerPageGoneSchedule1() {
+ LOG.info("NUTCH-1245: test long running continuous crawl");
+ ContinuousCrawlTestUtil crawlUtil = new ContinuousCrawlTestUtil(
+ STATUS_FETCH_GONE, STATUS_DB_GONE);
+ if (!crawlUtil.run(20)) {
+ fail("fetch_gone did not result in a db_gone (NUTCH-1245)");
+ }
+ }
+
+ /**
+ * NUTCH-1245: a fetch_gone should always result in a db_gone.
+ * <p>
+ * As some kind of misconfiguration set db.fetch.interval.default to a value
+ * > (fetchIntervalMax * 1.5).
+ * </p>
+ */
+ @Test
+ public void testCrawlDbReducerPageGoneSchedule2() {
+ LOG.info("NUTCH-1245 (misconfiguration): test with
db.fetch.interval.default > (1.5 * db.fetch.interval.max)");
+ Configuration conf = CrawlDBTestUtil.createConfiguration();
+ int fetchIntervalMax = conf.getInt("db.fetch.interval.max", 0);
+ conf.setInt("db.fetch.interval.default",
+ 3 + (int) (fetchIntervalMax * 1.5));
+ ContinuousCrawlTestUtil crawlUtil = new ContinuousCrawlTestUtil(conf,
+ STATUS_FETCH_GONE, STATUS_DB_GONE);
+ if (!crawlUtil.run(0)) {
+ fail("fetch_gone did not result in a db_gone (NUTCH-1245)");
+ }
+ }
+
+}
+