This is an automated email from the ASF dual-hosted git repository.
snagel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nutch.git
The following commit(s) were added to refs/heads/master by this push:
new 5907604 NUTCH-2518 Cleaning up the file system after a job failure.
new 8682b96 Merge pull request #307 from Omkar20895/NUTCH-2518
5907604 is described below
commit 5907604b341f3eda1aae924606fce9022446132c
Author: Omkar20895 <[email protected]>
AuthorDate: Mon Apr 2 16:38:43 2018 +0530
NUTCH-2518 Cleaning up the file system after a job failure.
---
src/java/org/apache/nutch/crawl/CrawlDb.java | 20 +++++---
src/java/org/apache/nutch/crawl/CrawlDbMerger.java | 22 ++++++---
src/java/org/apache/nutch/crawl/CrawlDbReader.java | 57 +++++++++++++++++-----
.../org/apache/nutch/crawl/DeduplicationJob.java | 28 +++++++++--
src/java/org/apache/nutch/crawl/Generator.java | 42 ++++++++++++----
src/java/org/apache/nutch/crawl/Injector.java | 18 ++-----
src/java/org/apache/nutch/crawl/LinkDb.java | 29 ++++++++---
src/java/org/apache/nutch/crawl/LinkDbMerger.java | 15 +++++-
src/java/org/apache/nutch/crawl/LinkDbReader.java | 11 ++++-
src/java/org/apache/nutch/fetcher/Fetcher.java | 9 +++-
src/java/org/apache/nutch/hostdb/ReadHostDb.java | 2 +-
src/java/org/apache/nutch/hostdb/UpdateHostDb.java | 16 ++++--
src/java/org/apache/nutch/indexer/CleaningJob.java | 9 +++-
src/java/org/apache/nutch/indexer/IndexingJob.java | 11 ++++-
src/java/org/apache/nutch/parse/ParseSegment.java | 11 ++++-
.../org/apache/nutch/segment/SegmentMerger.java | 14 +++++-
.../org/apache/nutch/segment/SegmentReader.java | 9 +++-
src/java/org/apache/nutch/tools/FreeGenerator.java | 11 ++++-
.../apache/nutch/tools/arc/ArcSegmentCreator.java | 9 +++-
.../org/apache/nutch/tools/warc/WARCExporter.java | 13 +++--
src/java/org/apache/nutch/util/NutchJob.java | 17 +++++++
.../org/apache/nutch/util/SitemapProcessor.java | 17 ++-----
22 files changed, 292 insertions(+), 98 deletions(-)
diff --git a/src/java/org/apache/nutch/crawl/CrawlDb.java
b/src/java/org/apache/nutch/crawl/CrawlDb.java
index a545509..05fc3c6 100644
--- a/src/java/org/apache/nutch/crawl/CrawlDb.java
+++ b/src/java/org/apache/nutch/crawl/CrawlDb.java
@@ -129,17 +129,23 @@ public class CrawlDb extends NutchTool implements Tool {
LOG.info("CrawlDb update: Merging segment data into db.");
}
+ FileSystem fs = crawlDb.getFileSystem(getConf());
+ Path outPath = FileOutputFormat.getOutputPath(job);
try {
- int complete = job.waitForCompletion(true)?0:1;
+ boolean success = job.waitForCompletion(true);
+ if (!success) {
+ String message = "Crawl job did not succeed, job status:"
+ + job.getStatus().getState() + ", reason: "
+ + job.getStatus().getFailureInfo();
+ LOG.error(message);
+ NutchJob.cleanupAfterFailure(outPath, lock, fs);
+ throw new RuntimeException(message);
+ }
} catch (IOException | InterruptedException | ClassNotFoundException e) {
- FileSystem fs = crawlDb.getFileSystem(getConf());
- LockUtil.removeLockFile(fs, lock);
- Path outPath = FileOutputFormat.getOutputPath(job);
- if (fs.exists(outPath))
- fs.delete(outPath, true);
+ LOG.error("Crawl job failed {}", e);
+ NutchJob.cleanupAfterFailure(outPath, lock, fs);
throw e;
}
-
CrawlDb.install(job, crawlDb);
diff --git a/src/java/org/apache/nutch/crawl/CrawlDbMerger.java
b/src/java/org/apache/nutch/crawl/CrawlDbMerger.java
index 35eca60..d8756fd 100644
--- a/src/java/org/apache/nutch/crawl/CrawlDbMerger.java
+++ b/src/java/org/apache/nutch/crawl/CrawlDbMerger.java
@@ -144,15 +144,23 @@ public class CrawlDbMerger extends Configured implements
Tool {
}
FileInputFormat.addInputPath(job, new Path(dbs[i],
CrawlDb.CURRENT_NAME));
}
+
+ Path outPath = FileOutputFormat.getOutputPath(job);
+ FileSystem fs = outPath.getFileSystem(getConf());
try {
- int complete = job.waitForCompletion(true)?0:1;
+ boolean success = job.waitForCompletion(true);
+ if (!success) {
+ String message = "CrawlDbMerger job did not succeed, job status:"
+ + job.getStatus().getState() + ", reason: "
+ + job.getStatus().getFailureInfo();
+ LOG.error(message);
+ NutchJob.cleanupAfterFailure(outPath, lock, fs);
+ throw new RuntimeException(message);
+ }
CrawlDb.install(job, output);
- } catch (IOException e) {
- LockUtil.removeLockFile(getConf(), lock);
- Path outPath = FileOutputFormat.getOutputPath(job);
- FileSystem fs = outPath.getFileSystem(getConf());
- if (fs.exists(outPath))
- fs.delete(outPath, true);
+ } catch (IOException | InterruptedException | ClassNotFoundException e) {
+ LOG.error("CrawlDbMerge job failed {}", e);
+ NutchJob.cleanupAfterFailure(outPath, lock, fs);
throw e;
}
long end = System.currentTimeMillis();
diff --git a/src/java/org/apache/nutch/crawl/CrawlDbReader.java
b/src/java/org/apache/nutch/crawl/CrawlDbReader.java
index c1a79e9..dcf5ace 100644
--- a/src/java/org/apache/nutch/crawl/CrawlDbReader.java
+++ b/src/java/org/apache/nutch/crawl/CrawlDbReader.java
@@ -401,15 +401,23 @@ public class CrawlDbReader extends AbstractChecker
implements Closeable {
// https://issues.apache.org/jira/browse/NUTCH-1029
config.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
false);
-
+ FileSystem fileSystem = tmpFolder.getFileSystem(config);
try {
- int complete = job.waitForCompletion(true)?0:1;
- } catch (InterruptedException | ClassNotFoundException e) {
+ boolean success = job.waitForCompletion(true);
+ if (!success) {
+ String message = "CrawlDbReader job did not succeed, job status:"
+ + job.getStatus().getState() + ", reason: "
+ + job.getStatus().getFailureInfo();
+ LOG.error(message);
+ fileSystem.delete(tmpFolder, true);
+ throw new RuntimeException(message);
+ }
+ } catch (IOException | InterruptedException | ClassNotFoundException
e) {
LOG.error(StringUtils.stringifyException(e));
+ fileSystem.delete(tmpFolder, true);
throw e;
}
// reading the result
- FileSystem fileSystem = tmpFolder.getFileSystem(config);
SequenceFile.Reader[] readers =
SegmentReaderUtil.getReaders(tmpFolder, config);
Text key = new Text();
@@ -684,8 +692,15 @@ public class CrawlDbReader extends AbstractChecker
implements Closeable {
job.setOutputValueClass(CrawlDatum.class);
try {
- int complete = job.waitForCompletion(true)?0:1;
- } catch (InterruptedException | ClassNotFoundException e) {
+ boolean success = job.waitForCompletion(true);
+ if (!success) {
+ String message = "CrawlDbReader job did not succeed, job status:"
+ + job.getStatus().getState() + ", reason: "
+ + job.getStatus().getFailureInfo();
+ LOG.error(message);
+ throw new RuntimeException(message);
+ }
+ } catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error(StringUtils.stringifyException(e));
throw e;
}
@@ -788,11 +803,21 @@ public class CrawlDbReader extends AbstractChecker
implements Closeable {
job.setOutputValueClass(Text.class);
job.getConfiguration().setFloat("db.reader.topn.min", min);
-
+
+ FileSystem fs = tempDir.getFileSystem(config);
try{
- int complete = job.waitForCompletion(true)?0:1;
- } catch (InterruptedException | ClassNotFoundException e) {
+ boolean success = job.waitForCompletion(true);
+ if (!success) {
+ String message = "CrawlDbReader job did not succeed, job status:"
+ + job.getStatus().getState() + ", reason: "
+ + job.getStatus().getFailureInfo();
+ LOG.error(message);
+ fs.delete(tempDir, true);
+ throw new RuntimeException(message);
+ }
+ } catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error(StringUtils.stringifyException(e));
+ fs.delete(tempDir, true);
throw e;
}
@@ -816,13 +841,21 @@ public class CrawlDbReader extends AbstractChecker
implements Closeable {
job.setNumReduceTasks(1); // create a single file.
try{
- int complete = job.waitForCompletion(true)?0:1;
- } catch (InterruptedException | ClassNotFoundException e) {
+ boolean success = job.waitForCompletion(true);
+ if (!success) {
+ String message = "CrawlDbReader job did not succeed, job status:"
+ + job.getStatus().getState() + ", reason: "
+ + job.getStatus().getFailureInfo();
+ LOG.error(message);
+ fs.delete(tempDir, true);
+ throw new RuntimeException(message);
+ }
+ } catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error(StringUtils.stringifyException(e));
+ fs.delete(tempDir, true);
throw e;
}
- FileSystem fs = tempDir.getFileSystem(config);
fs.delete(tempDir, true);
if (LOG.isInfoEnabled()) {
LOG.info("CrawlDb topN: done");
diff --git a/src/java/org/apache/nutch/crawl/DeduplicationJob.java
b/src/java/org/apache/nutch/crawl/DeduplicationJob.java
index f2283ee..555f9e2 100644
--- a/src/java/org/apache/nutch/crawl/DeduplicationJob.java
+++ b/src/java/org/apache/nutch/crawl/DeduplicationJob.java
@@ -312,8 +312,17 @@ public class DeduplicationJob extends NutchTool implements
Tool {
job.setMapperClass(DBFilter.class);
job.setReducerClass(DedupReducer.class);
+ FileSystem fs = tempDir.getFileSystem(getConf());
try {
- int complete = job.waitForCompletion(true)?0:1;
+ boolean success = job.waitForCompletion(true);
+ if (!success) {
+ String message = "Crawl job did not succeed, job status:"
+ + job.getStatus().getState() + ", reason: "
+ + job.getStatus().getFailureInfo();
+ LOG.error(message);
+ fs.delete(tempDir, true);
+ throw new RuntimeException(message);
+ }
CounterGroup g = job.getCounters().getGroup("DeduplicationJobStatus");
if (g != null) {
Counter counter = g.findCounter("Documents marked as duplicate");
@@ -321,8 +330,9 @@ public class DeduplicationJob extends NutchTool implements
Tool {
LOG.info("Deduplication: " + (int) dups
+ " documents marked as duplicates");
}
- } catch (final Exception e) {
+ } catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error("DeduplicationJob: " + StringUtils.stringifyException(e));
+ fs.delete(tempDir, true);
return -1;
}
@@ -337,16 +347,24 @@ public class DeduplicationJob extends NutchTool
implements Tool {
mergeJob.setReducerClass(StatusUpdateReducer.class);
try {
- int complete = job.waitForCompletion(true)?0:1;
- } catch (final Exception e) {
+ boolean success = job.waitForCompletion(true);
+ if (!success) {
+ String message = "Crawl job did not succeed, job status:"
+ + job.getStatus().getState() + ", reason: "
+ + job.getStatus().getFailureInfo();
+ LOG.error(message);
+ fs.delete(tempDir, true);
+ throw new RuntimeException(message);
+ }
+ } catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error("DeduplicationMergeJob: " + StringUtils.stringifyException(e));
+ fs.delete(tempDir, true);
return -1;
}
CrawlDb.install(mergeJob, dbPath);
// clean up
- FileSystem fs = tempDir.getFileSystem(getConf());
fs.delete(tempDir, true);
long end = System.currentTimeMillis();
diff --git a/src/java/org/apache/nutch/crawl/Generator.java
b/src/java/org/apache/nutch/crawl/Generator.java
index c972a13..c9096d6 100644
--- a/src/java/org/apache/nutch/crawl/Generator.java
+++ b/src/java/org/apache/nutch/crawl/Generator.java
@@ -781,10 +781,18 @@ public class Generator extends NutchTool implements Tool {
MultipleOutputs.addNamedOutput(job, "sequenceFiles",
SequenceFileOutputFormat.class, FloatWritable.class, SelectorEntry.class);
try {
- int complete = job.waitForCompletion(true)?0:1;
+ boolean success = job.waitForCompletion(true);
+ if (!success) {
+ String message = "Generator job did not succeed, job status:"
+ + job.getStatus().getState() + ", reason: "
+ + job.getStatus().getFailureInfo();
+ LOG.error(message);
+ NutchJob.cleanupAfterFailure(tempDir, lock, fs);
+ throw new RuntimeException(message);
+ }
} catch (IOException | InterruptedException | ClassNotFoundException e) {
- LockUtil.removeLockFile(getConf(), lock);
- fs.delete(tempDir, true);
+ LOG.error("Generator job failed {}", e);
+ NutchJob.cleanupAfterFailure(tempDir, lock, fs);
throw e;
}
@@ -838,12 +846,21 @@ public class Generator extends NutchTool implements Tool {
job.setOutputValueClass(CrawlDatum.class);
FileOutputFormat.setOutputPath(job, tempDir2);
try {
- int complete = job.waitForCompletion(true)?0:1;
+ boolean success = job.waitForCompletion(true);
+ if (!success) {
+ String message = "Generator job did not succeed, job status:"
+ + job.getStatus().getState() + ", reason: "
+ + job.getStatus().getFailureInfo();
+ LOG.error(message);
+ NutchJob.cleanupAfterFailure(tempDir, lock, fs);
+ NutchJob.cleanupAfterFailure(tempDir2, lock, fs);
+ throw new RuntimeException(message);
+ }
CrawlDb.install(job, dbDir);
} catch (IOException | InterruptedException | ClassNotFoundException e) {
- LockUtil.removeLockFile(getConf(), lock);
- fs.delete(tempDir, true);
- fs.delete(tempDir2, true);
+ LOG.error("Generator job failed {}", e);
+ NutchJob.cleanupAfterFailure(tempDir, lock, fs);
+ NutchJob.cleanupAfterFailure(tempDir2, lock, fs);
throw e;
}
@@ -894,8 +911,15 @@ public class Generator extends NutchTool implements Tool {
job.setOutputValueClass(CrawlDatum.class);
job.setSortComparatorClass(HashComparator.class);
try {
- int complete = job.waitForCompletion(true)?0:1;
- } catch (InterruptedException | ClassNotFoundException e) {
+ boolean success = job.waitForCompletion(true);
+ if (!success) {
+ String message = "Generator job did not succeed, job status:"
+ + job.getStatus().getState() + ", reason: "
+ + job.getStatus().getFailureInfo();
+ LOG.error(message);
+ throw new RuntimeException(message);
+ }
+ } catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error(StringUtils.stringifyException(e));
throw e;
}
diff --git a/src/java/org/apache/nutch/crawl/Injector.java
b/src/java/org/apache/nutch/crawl/Injector.java
index d872dbf..093d483 100644
--- a/src/java/org/apache/nutch/crawl/Injector.java
+++ b/src/java/org/apache/nutch/crawl/Injector.java
@@ -40,9 +40,9 @@ import org.apache.nutch.net.URLFilters;
import org.apache.nutch.net.URLNormalizers;
import org.apache.nutch.scoring.ScoringFilterException;
import org.apache.nutch.scoring.ScoringFilters;
-import org.apache.nutch.util.LockUtil;
import org.apache.nutch.service.NutchServer;
import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.NutchTool;
import org.apache.nutch.util.TimingUtil;
@@ -420,7 +420,7 @@ public class Injector extends NutchTool implements Tool {
+ job.getStatus().getState() + ", reason: "
+ job.getStatus().getFailureInfo();
LOG.error(message);
- cleanupAfterFailure(tempCrawlDb, lock, fs);
+ NutchJob.cleanupAfterFailure(tempCrawlDb, lock, fs);
// throw exception so that calling routine can exit with error
throw new RuntimeException(message);
}
@@ -463,19 +463,7 @@ public class Injector extends NutchTool implements Tool {
}
} catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error("Injector job failed", e);
- cleanupAfterFailure(tempCrawlDb, lock, fs);
- throw e;
- }
- }
-
- public void cleanupAfterFailure(Path tempCrawlDb, Path lock, FileSystem fs)
- throws IOException {
- try {
- if (fs.exists(tempCrawlDb)) {
- fs.delete(tempCrawlDb, true);
- }
- LockUtil.removeLockFile(fs, lock);
- } catch (IOException e) {
+ NutchJob.cleanupAfterFailure(tempCrawlDb, lock, fs);
throw e;
}
}
diff --git a/src/java/org/apache/nutch/crawl/LinkDb.java
b/src/java/org/apache/nutch/crawl/LinkDb.java
index c6a32ba..37bfb7b 100644
--- a/src/java/org/apache/nutch/crawl/LinkDb.java
+++ b/src/java/org/apache/nutch/crawl/LinkDb.java
@@ -228,8 +228,17 @@ public class LinkDb extends NutchTool implements Tool {
ParseData.DIR_NAME));
}
try {
- int complete = job.waitForCompletion(true)?0:1;
- } catch (IOException | InterruptedException | ClassNotFoundException e) {
+ boolean success = job.waitForCompletion(true);
+ if (!success) {
+ String message = "LinkDb job did not succeed, job status:"
+ + job.getStatus().getState() + ", reason: "
+ + job.getStatus().getFailureInfo();
+ LOG.error(message);
+ LockUtil.removeLockFile(fs, lock);
+ throw new RuntimeException(message);
+ }
+ } catch (IOException | InterruptedException | ClassNotFoundException e) {
+ LOG.error("LinkDb job failed {}", e);
LockUtil.removeLockFile(fs, lock);
throw e;
}
@@ -244,10 +253,18 @@ public class LinkDb extends NutchTool implements Tool {
FileInputFormat.addInputPath(job, currentLinkDb);
FileInputFormat.addInputPath(job, newLinkDb);
try {
- int complete = job.waitForCompletion(true)?0:1;
- } catch (IOException e) {
- LockUtil.removeLockFile(fs, lock);
- fs.delete(newLinkDb, true);
+ boolean success = job.waitForCompletion(true);
+ if (!success) {
+ String message = "LinkDb job did not succeed, job status:"
+ + job.getStatus().getState() + ", reason: "
+ + job.getStatus().getFailureInfo();
+ LOG.error(message);
+ NutchJob.cleanupAfterFailure(newLinkDb, lock, fs);
+ throw new RuntimeException(message);
+ }
+ } catch (IOException | InterruptedException | ClassNotFoundException e) {
+ LOG.error("LinkDb job failed {}", e);
+ NutchJob.cleanupAfterFailure(newLinkDb, lock, fs);
throw e;
}
fs.delete(newLinkDb, true);
diff --git a/src/java/org/apache/nutch/crawl/LinkDbMerger.java
b/src/java/org/apache/nutch/crawl/LinkDbMerger.java
index c8e3943..f2f0892 100644
--- a/src/java/org/apache/nutch/crawl/LinkDbMerger.java
+++ b/src/java/org/apache/nutch/crawl/LinkDbMerger.java
@@ -121,7 +121,20 @@ public class LinkDbMerger extends Configured implements
Tool {
for (int i = 0; i < dbs.length; i++) {
FileInputFormat.addInputPath(job, new Path(dbs[i], LinkDb.CURRENT_NAME));
}
- int complete = job.waitForCompletion(true)?0:1;
+
+ try {
+ boolean success = job.waitForCompletion(true);
+ if (!success) {
+ String message = "LinkDbMerge job did not succeed, job status:"
+ + job.getStatus().getState() + ", reason: "
+ + job.getStatus().getFailureInfo();
+ LOG.error(message);
+ throw new RuntimeException(message);
+ }
+ } catch (IOException | InterruptedException | ClassNotFoundException e) {
+ LOG.error("LinkDbMerge job failed {}", e);
+ throw e;
+ }
FileSystem fs = output.getFileSystem(getConf());
fs.mkdirs(output);
fs.rename(FileOutputFormat.getOutputPath(job), new Path(output,
diff --git a/src/java/org/apache/nutch/crawl/LinkDbReader.java
b/src/java/org/apache/nutch/crawl/LinkDbReader.java
index 8efaf0a..519fa59 100644
--- a/src/java/org/apache/nutch/crawl/LinkDbReader.java
+++ b/src/java/org/apache/nutch/crawl/LinkDbReader.java
@@ -166,8 +166,15 @@ public class LinkDbReader extends AbstractChecker
implements Closeable {
job.setOutputValueClass(Inlinks.class);
try{
- int complete = job.waitForCompletion(true)?0:1;
- } catch (InterruptedException | ClassNotFoundException e){
+ boolean success = job.waitForCompletion(true);
+ if (!success) {
+ String message = "LinkDbRead job did not succeed, job status:"
+ + job.getStatus().getState() + ", reason: "
+ + job.getStatus().getFailureInfo();
+ LOG.error(message);
+ throw new RuntimeException(message);
+ }
+ } catch (IOException | InterruptedException | ClassNotFoundException e){
LOG.error(StringUtils.stringifyException(e));
throw e;
}
diff --git a/src/java/org/apache/nutch/fetcher/Fetcher.java
b/src/java/org/apache/nutch/fetcher/Fetcher.java
index 34fb136..ba34d68 100644
--- a/src/java/org/apache/nutch/fetcher/Fetcher.java
+++ b/src/java/org/apache/nutch/fetcher/Fetcher.java
@@ -495,7 +495,14 @@ public class Fetcher extends NutchTool implements Tool {
job.setOutputValueClass(NutchWritable.class);
try {
- int complete = job.waitForCompletion(true)?0:1;
+ boolean success = job.waitForCompletion(true);
+ if (!success) {
+ String message = "Fetcher job did not succeed, job status:"
+ + job.getStatus().getState() + ", reason: "
+ + job.getStatus().getFailureInfo();
+ LOG.error(message);
+ throw new RuntimeException(message);
+ }
} catch (InterruptedException | ClassNotFoundException e) {
LOG.error(StringUtils.stringifyException(e));
throw e;
diff --git a/src/java/org/apache/nutch/hostdb/ReadHostDb.java
b/src/java/org/apache/nutch/hostdb/ReadHostDb.java
index e53e0c3..408e3ea 100644
--- a/src/java/org/apache/nutch/hostdb/ReadHostDb.java
+++ b/src/java/org/apache/nutch/hostdb/ReadHostDb.java
@@ -208,7 +208,7 @@ public class ReadHostDb extends Configured implements Tool {
throw new RuntimeException(message);
}
} catch (IOException | InterruptedException | ClassNotFoundException e) {
- LOG.error("ReadHostDb job failed", e);
+ LOG.error("ReadHostDb job failed {}", e);
throw e;
}
diff --git a/src/java/org/apache/nutch/hostdb/UpdateHostDb.java
b/src/java/org/apache/nutch/hostdb/UpdateHostDb.java
index 9119c35..7209278 100644
--- a/src/java/org/apache/nutch/hostdb/UpdateHostDb.java
+++ b/src/java/org/apache/nutch/hostdb/UpdateHostDb.java
@@ -129,17 +129,23 @@ public class UpdateHostDb extends Configured implements
Tool {
conf.setClassLoader(Thread.currentThread().getContextClassLoader());
try {
- int complete = job.waitForCompletion(true)?0:1;
+ boolean success = job.waitForCompletion(true);
+ if (!success) {
+ String message = "UpdateHostDb job did not succeed, job status:"
+ + job.getStatus().getState() + ", reason: "
+ + job.getStatus().getFailureInfo();
+ LOG.error(message);
+ NutchJob.cleanupAfterFailure(tempHostDb, lock, fs);
+ throw new RuntimeException(message);
+ }
FSUtils.replace(fs, old, current, true);
FSUtils.replace(fs, current, tempHostDb, true);
if (!preserveBackup && fs.exists(old)) fs.delete(old, true);
} catch (Exception e) {
- if (fs.exists(tempHostDb)) {
- fs.delete(tempHostDb, true);
- }
- LockUtil.removeLockFile(fs, lock);
+ LOG.error("UpdateHostDb job failed {}", e);
+ NutchJob.cleanupAfterFailure(tempHostDb, lock, fs);
throw e;
}
diff --git a/src/java/org/apache/nutch/indexer/CleaningJob.java
b/src/java/org/apache/nutch/indexer/CleaningJob.java
index a8ac640..3ac8b9e 100644
--- a/src/java/org/apache/nutch/indexer/CleaningJob.java
+++ b/src/java/org/apache/nutch/indexer/CleaningJob.java
@@ -162,7 +162,14 @@ public class CleaningJob implements Tool {
conf.setBoolean(IndexerMapReduce.INDEXER_DELETE, true);
try{
- int complete = job.waitForCompletion(true)?0:1;
+ boolean success = job.waitForCompletion(true);
+ if (!success) {
+ String message = "CleaningJob did not succeed, job status:"
+ + job.getStatus().getState() + ", reason: "
+ + job.getStatus().getFailureInfo();
+ LOG.error(message);
+ throw new RuntimeException(message);
+ }
} catch (InterruptedException | ClassNotFoundException e) {
LOG.error(StringUtils.stringifyException(e));
throw e;
diff --git a/src/java/org/apache/nutch/indexer/IndexingJob.java
b/src/java/org/apache/nutch/indexer/IndexingJob.java
index e4997cb..6757291 100644
--- a/src/java/org/apache/nutch/indexer/IndexingJob.java
+++ b/src/java/org/apache/nutch/indexer/IndexingJob.java
@@ -146,8 +146,15 @@ public class IndexingJob extends NutchTool implements Tool
{
FileOutputFormat.setOutputPath(job, tmp);
try {
try{
- int complete = job.waitForCompletion(true)?0:1;
- } catch (InterruptedException | ClassNotFoundException e) {
+ boolean success = job.waitForCompletion(true);
+ if (!success) {
+ String message = "Indexing job did not succeed, job status:"
+ + job.getStatus().getState() + ", reason: "
+ + job.getStatus().getFailureInfo();
+ LOG.error(message);
+ throw new RuntimeException(message);
+ }
+ } catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error(StringUtils.stringifyException(e));
throw e;
}
diff --git a/src/java/org/apache/nutch/parse/ParseSegment.java
b/src/java/org/apache/nutch/parse/ParseSegment.java
index 61aa997..1d64463 100644
--- a/src/java/org/apache/nutch/parse/ParseSegment.java
+++ b/src/java/org/apache/nutch/parse/ParseSegment.java
@@ -259,8 +259,15 @@ public class ParseSegment extends NutchTool implements
Tool {
job.setOutputValueClass(ParseImpl.class);
try{
- int complete = job.waitForCompletion(true)?0:1;
- } catch (InterruptedException | ClassNotFoundException e) {
+ boolean success = job.waitForCompletion(true);
+ if (!success) {
+ String message = "Parse job did not succeed, job status:"
+ + job.getStatus().getState() + ", reason: "
+ + job.getStatus().getFailureInfo();
+ LOG.error(message);
+ throw new RuntimeException(message);
+ }
+ } catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error(StringUtils.stringifyException(e));
throw e;
}
diff --git a/src/java/org/apache/nutch/segment/SegmentMerger.java
b/src/java/org/apache/nutch/segment/SegmentMerger.java
index 780e10a..188ae69 100644
--- a/src/java/org/apache/nutch/segment/SegmentMerger.java
+++ b/src/java/org/apache/nutch/segment/SegmentMerger.java
@@ -738,7 +738,19 @@ public class SegmentMerger extends Configured implements
Tool{
setConf(conf);
- int complete = job.waitForCompletion(true)?0:1;
+ try {
+ boolean success = job.waitForCompletion(true);
+ if (!success) {
+ String message = "SegmentMerger job did not succeed, job status:"
+ + job.getStatus().getState() + ", reason: "
+ + job.getStatus().getFailureInfo();
+ LOG.error(message);
+ throw new RuntimeException(message);
+ }
+ } catch (IOException | InterruptedException | ClassNotFoundException e) {
+ LOG.error("SegmentMerger job failed {}", e);
+ throw e;
+ }
}
/**
diff --git a/src/java/org/apache/nutch/segment/SegmentReader.java
b/src/java/org/apache/nutch/segment/SegmentReader.java
index 28b88cd..0b65a2b 100644
--- a/src/java/org/apache/nutch/segment/SegmentReader.java
+++ b/src/java/org/apache/nutch/segment/SegmentReader.java
@@ -244,7 +244,14 @@ public class SegmentReader extends Configured implements
Tool {
job.setOutputValueClass(NutchWritable.class);
try {
- int complete = job.waitForCompletion(true)?0:1;
+ boolean success = job.waitForCompletion(true);
+ if (!success) {
+ String message = "SegmentReader job did not succeed, job status:"
+ + job.getStatus().getState() + ", reason: "
+ + job.getStatus().getFailureInfo();
+ LOG.error(message);
+ throw new RuntimeException(message);
+ }
} catch (IOException | InterruptedException | ClassNotFoundException e ){
LOG.error(StringUtils.stringifyException(e));
throw e;
diff --git a/src/java/org/apache/nutch/tools/FreeGenerator.java
b/src/java/org/apache/nutch/tools/FreeGenerator.java
index 3b01bb4..ab5109e 100644
--- a/src/java/org/apache/nutch/tools/FreeGenerator.java
+++ b/src/java/org/apache/nutch/tools/FreeGenerator.java
@@ -201,8 +201,15 @@ public class FreeGenerator extends Configured implements
Tool {
FileOutputFormat.setOutputPath(job, new Path(args[1], new Path(segName,
CrawlDatum.GENERATE_DIR_NAME)));
try {
- int complete = job.waitForCompletion(true)?0:1;
- } catch (Exception e) {
+ boolean success = job.waitForCompletion(true);
+ if (!success) {
+ String message = "FreeGenerator job did not succeed, job status:"
+ + job.getStatus().getState() + ", reason: "
+ + job.getStatus().getFailureInfo();
+ LOG.error(message);
+ throw new RuntimeException(message);
+ }
+ } catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error("FAILED: " + StringUtils.stringifyException(e));
return -1;
}
diff --git a/src/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java
b/src/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java
index 1f9e660..499b246 100644
--- a/src/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java
+++ b/src/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java
@@ -395,7 +395,14 @@ public class ArcSegmentCreator extends Configured
implements Tool {
job.setOutputValueClass(NutchWritable.class);
try {
- int complete = job.waitForCompletion(true)?0:1;
+ boolean success = job.waitForCompletion(true);
+ if (!success) {
+ String message = "ArcSegmentCreator job did not succeed, job status:"
+ + job.getStatus().getState() + ", reason: "
+ + job.getStatus().getFailureInfo();
+ LOG.error(message);
+ throw new RuntimeException(message);
+ }
} catch (IOException | InterruptedException | ClassNotFoundException e){
LOG.error(StringUtils.stringifyException(e));
throw e;
diff --git a/src/java/org/apache/nutch/tools/warc/WARCExporter.java
b/src/java/org/apache/nutch/tools/warc/WARCExporter.java
index aae8064..2921a97 100644
--- a/src/java/org/apache/nutch/tools/warc/WARCExporter.java
+++ b/src/java/org/apache/nutch/tools/warc/WARCExporter.java
@@ -285,13 +285,20 @@ public class WARCExporter extends Configured implements
Tool {
job.setOutputValueClass(WARCWritable.class);
try {
- int complete = job.waitForCompletion(true)?0:1;
+ boolean success = job.waitForCompletion(true);
+ if (!success) {
+ String message = "WARCExporter job did not succeed, job status:"
+ + job.getStatus().getState() + ", reason: "
+ + job.getStatus().getFailureInfo();
+ LOG.error(message);
+ throw new RuntimeException(message);
+ }
LOG.info(job.getCounters().toString());
long end = System.currentTimeMillis();
LOG.info("WARCExporter: finished at {}, elapsed: {}", sdf.format(end),
TimingUtil.elapsedTime(start, end));
- } catch (Exception e) {
- LOG.error("Exception caught", e);
+ } catch (IOException | InterruptedException | ClassNotFoundException e) {
+ LOG.error("WARCExporter job failed {}", e);
return -1;
}
diff --git a/src/java/org/apache/nutch/util/NutchJob.java
b/src/java/org/apache/nutch/util/NutchJob.java
index 34a9acd..06f1cc2 100644
--- a/src/java/org/apache/nutch/util/NutchJob.java
+++ b/src/java/org/apache/nutch/util/NutchJob.java
@@ -19,6 +19,8 @@ package org.apache.nutch.util;
import java.io.IOException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
@@ -34,4 +36,19 @@ public class NutchJob extends Job {
return Job.getInstance(conf);
}
+ /*
+ * Clean up the file system in case of a job failure.
+ */
+ public static void cleanupAfterFailure(Path tempDir, Path lock, FileSystem
fs)
+ throws IOException {
+ try {
+ if (fs.exists(tempDir)) {
+ fs.delete(tempDir, true);
+ }
+ LockUtil.removeLockFile(fs, lock);
+ } catch (IOException e) {
+ throw e;
+ }
+ }
+
}
diff --git a/src/java/org/apache/nutch/util/SitemapProcessor.java
b/src/java/org/apache/nutch/util/SitemapProcessor.java
index 380ac07..70f4372 100644
--- a/src/java/org/apache/nutch/util/SitemapProcessor.java
+++ b/src/java/org/apache/nutch/util/SitemapProcessor.java
@@ -52,6 +52,7 @@ import org.apache.nutch.protocol.Protocol;
import org.apache.nutch.protocol.ProtocolFactory;
import org.apache.nutch.protocol.ProtocolOutput;
import org.apache.nutch.protocol.ProtocolStatus;
+import org.apache.nutch.util.NutchJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -383,7 +384,7 @@ public class SitemapProcessor extends Configured implements
Tool {
+ " job did not succeed, job status: " + job.getStatus().getState()
+ ", reason: " + job.getStatus().getFailureInfo();
LOG.error(message);
- cleanupAfterFailure(tempCrawlDb, lock, fs);
+ NutchJob.cleanupAfterFailure(tempCrawlDb, lock, fs);
// throw exception so that calling routine can exit with error
throw new RuntimeException(message);
}
@@ -415,19 +416,7 @@ public class SitemapProcessor extends Configured
implements Tool {
}
} catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error("SitemapProcessor_" + crawldb.toString(), e);
- cleanupAfterFailure(tempCrawlDb, lock, fs);
- throw e;
- }
- }
-
- public void cleanupAfterFailure(Path tempCrawlDb, Path lock, FileSystem fs)
- throws IOException {
- try {
- if (fs.exists(tempCrawlDb)) {
- fs.delete(tempCrawlDb, true);
- }
- LockUtil.removeLockFile(fs, lock);
- } catch (IOException e) {
+ NutchJob.cleanupAfterFailure(tempCrawlDb, lock, fs);
throw e;
}
}
--
To stop receiving notification emails like this one, please contact
[email protected].