Repository: nutch Updated Branches: refs/heads/master b62f43fda -> d6bcefd92
NUTCH-2250 : CommonCrawlDumper : Invalid format and skipped parts + Reads all parts of segements + FIX : writes only one document to dump file Project: http://git-wip-us.apache.org/repos/asf/nutch/repo Commit: http://git-wip-us.apache.org/repos/asf/nutch/commit/47cc4e27 Tree: http://git-wip-us.apache.org/repos/asf/nutch/tree/47cc4e27 Diff: http://git-wip-us.apache.org/repos/asf/nutch/diff/47cc4e27 Branch: refs/heads/master Commit: 47cc4e27a41a78753ed431685010da1987bca269 Parents: b62f43f Author: Thamme Gowda <[email protected]> Authored: Thu Apr 14 02:38:30 2016 -0700 Committer: Thamme Gowda <[email protected]> Committed: Thu Apr 14 02:38:30 2016 -0700 ---------------------------------------------------------------------- .../nutch/tools/CommonCrawlDataDumper.java | 82 ++++++++------------ .../apache/nutch/tools/CommonCrawlFormat.java | 3 +- 2 files changed, 33 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nutch/blob/47cc4e27/src/java/org/apache/nutch/tools/CommonCrawlDataDumper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/nutch/tools/CommonCrawlDataDumper.java b/src/java/org/apache/nutch/tools/CommonCrawlDataDumper.java index 87d1db8..d00df0a 100644 --- a/src/java/org/apache/nutch/tools/CommonCrawlDataDumper.java +++ b/src/java/org/apache/nutch/tools/CommonCrawlDataDumper.java @@ -23,7 +23,6 @@ import java.io.BufferedOutputStream; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; -import java.io.FileFilter; import java.io.FileOutputStream; import java.io.IOException; import java.net.MalformedURLException; @@ -33,6 +32,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.commons.cli.CommandLine; @@ -54,7 +54,9 @@ import org.apache.commons.io.FilenameUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.StringUtils; @@ -238,66 +240,42 @@ public class CommonCrawlDataDumper extends Configured implements Tool { Map<String, Integer> filteredCounts = new HashMap<String, Integer>(); Configuration nutchConfig = NutchConfiguration.create(); - FileSystem fs = FileSystem.get(nutchConfig); - File[] segmentDirs = segmentRootDir.listFiles(new FileFilter() { - @Override - public boolean accept(File file) { - return file.canRead() && file.isDirectory(); + final FileSystem fs = FileSystem.get(nutchConfig); + Path segmentRootPath = new Path(segmentRootDir.toString()); + + //get all paths + List<Path> parts = new ArrayList<>(); + RemoteIterator<LocatedFileStatus> files = fs.listFiles(segmentRootPath, true); + String partPattern = ".*" + File.separator + Content.DIR_NAME + File.separator + "part-[0-9]{5}" + File.separator + "data"; + while (files.hasNext()) { + LocatedFileStatus next = files.next(); + if (next.isFile()) { + Path path = next.getPath(); + if (path.toString().matches(partPattern)){ + parts.add(path); + } } - }); - - if (new File( - segmentRootDir.getAbsolutePath() + File.separator + Content.DIR_NAME - + "/part-00000/data").exists()) { - segmentDirs = new File[] { segmentRootDir }; } - if (segmentDirs == null) { - LOG.error( - "No segment directories found in [" + segmentRootDir.getAbsolutePath() - + "]"); + if (parts == null || parts.size() == 0) { + LOG.error( "No segment directories found in [ {}] ", segmentRootDir.getAbsolutePath()); System.exit(1); } - + LOG.info("Found {} segment parts", parts.size()); if (gzip && !warc) { - fileList = new ArrayList<String>(); + fileList = new ArrayList<>(); constructNewStream(outputDir); } - CommonCrawlFormat format = CommonCrawlFormatFactory - .getCommonCrawlFormat("JACKSON", nutchConfig, config); - - if (warc) { - format = CommonCrawlFormatFactory - .getCommonCrawlFormat("WARC", nutchConfig, config); - } - - for (File segment : segmentDirs) { - LOG.info("Processing segment: [" + segment.getAbsolutePath() + "]"); + for (Path segmentPart : parts) { + LOG.info("Processing segment Part : [ {} ]", segmentPart); try { - String segmentContentPath = - segment.getAbsolutePath() + File.separator + Content.DIR_NAME - + "/part-00000/data"; - Path file = new Path(segmentContentPath); - - if (!new File(file.toString()).exists()) { - LOG.warn("Skipping segment: [" + segmentContentPath - + "]: no data directory present"); - continue; - } SequenceFile.Reader reader = new SequenceFile.Reader(nutchConfig, - SequenceFile.Reader.file(file)); - - if (!new File(file.toString()).exists()) { - LOG.warn("Skipping segment: [" + segmentContentPath - + "]: no data directory present"); - continue; - } + SequenceFile.Reader.file(segmentPart)); Writable key = (Writable) reader.getKeyClass().newInstance(); Content content = null; - while (reader.next(key)) { content = new Content(); reader.getCurrentValue(content); @@ -368,7 +346,11 @@ public class CommonCrawlDataDumper extends Configured implements Tool { String mimeType = new Tika().detect(content.getContent()); // Maps file to JSON-based structure - jsonData = format.getJsonData(url, content, metadata); + //TODO: Make this Jackson Format implementation reusable + try (CommonCrawlFormat format = CommonCrawlFormatFactory + .getCommonCrawlFormat(warc ? "WARC" : "JACKSON", nutchConfig, config)) { + jsonData = format.getJsonData(url, content, metadata); + } collectStats(typeCounts, mimeType); // collects statistics for the given mimetypes @@ -415,16 +397,14 @@ public class CommonCrawlDataDumper extends Configured implements Tool { } } } - reader.close(); + } catch (Exception e){ + LOG.warn("SKIPPED: {} Because : {}", segmentPart, e.getMessage()); } finally { fs.close(); } } - // close the format if needed - format.close(); - if (gzip && !warc) { closeStream(); } http://git-wip-us.apache.org/repos/asf/nutch/blob/47cc4e27/src/java/org/apache/nutch/tools/CommonCrawlFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/nutch/tools/CommonCrawlFormat.java b/src/java/org/apache/nutch/tools/CommonCrawlFormat.java index ec19027..87baeb5 100644 --- a/src/java/org/apache/nutch/tools/CommonCrawlFormat.java +++ b/src/java/org/apache/nutch/tools/CommonCrawlFormat.java @@ -21,6 +21,7 @@ import org.apache.nutch.metadata.Metadata; import org.apache.nutch.parse.ParseData; import org.apache.nutch.protocol.Content; +import java.io.Closeable; import java.io.IOException; /** @@ -30,7 +31,7 @@ import java.io.IOException; * @author gtotaro * */ -public interface CommonCrawlFormat { +public interface CommonCrawlFormat extends Closeable { /** *
