http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMerger.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMerger.java b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMerger.java new file mode 100644 index 0000000..ef12f52 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMerger.java @@ -0,0 +1,793 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.segment; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.TreeMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.MapFile; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.MapFile.Writer.Option; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.SequenceFile.Metadata; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapred.SequenceFileRecordReader; +import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.Generator; +import org.apache.nutch.metadata.MetaWrapper; +import org.apache.nutch.metadata.Nutch; +import org.apache.nutch.net.URLFilters; +import org.apache.nutch.net.URLNormalizers; +import org.apache.nutch.parse.ParseData; +import org.apache.nutch.parse.ParseText; +import org.apache.nutch.protocol.Content; +import org.apache.nutch.util.HadoopFSUtil; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; + +/** + * This tool takes several segments and merges their data together. Only the + * latest versions of data is retained. + * <p> + * Optionally, you can apply current URLFilters to remove prohibited URL-s. + * </p> + * <p> + * Also, it's possible to slice the resulting segment into chunks of fixed size. + * </p> + * <h3>Important Notes</h3> <h4>Which parts are merged?</h4> + * <p> + * It doesn't make sense to merge data from segments, which are at different + * stages of processing (e.g. one unfetched segment, one fetched but not parsed, + * and one fetched and parsed). Therefore, prior to merging, the tool will + * determine the lowest common set of input data, and only this data will be + * merged. This may have some unintended consequences: e.g. if majority of input + * segments are fetched and parsed, but one of them is unfetched, the tool will + * fall back to just merging fetchlists, and it will skip all other data from + * all segments. + * </p> + * <h4>Merging fetchlists</h4> + * <p> + * Merging segments, which contain just fetchlists (i.e. prior to fetching) is + * not recommended, because this tool (unlike the + * {@link org.apache.nutch.crawl.Generator} doesn't ensure that fetchlist parts + * for each map task are disjoint. + * </p> + * <p> + * <h4>Duplicate content</h4> + * Merging segments removes older content whenever possible (see below). + * However, this is NOT the same as de-duplication, which in addition removes + * identical content found at different URL-s. In other words, running + * DeleteDuplicates is still necessary. + * </p> + * <p> + * For some types of data (especially ParseText) it's not possible to determine + * which version is really older. Therefore the tool always uses segment names + * as timestamps, for all types of input data. Segment names are compared in + * forward lexicographic order (0-9a-zA-Z), and data from segments with "higher" + * names will prevail. It follows then that it is extremely important that + * segments be named in an increasing lexicographic order as their creation time + * increases. + * </p> + * <p> + * <h4>Merging and indexes</h4> + * Merged segment gets a different name. Since Indexer embeds segment names in + * indexes, any indexes originally created for the input segments will NOT work + * with the merged segment. Newly created merged segment(s) need to be indexed + * afresh. This tool doesn't use existing indexes in any way, so if you plan to + * merge segments you don't have to index them prior to merging. + * + * + * @author Andrzej Bialecki + */ +public class SegmentMerger extends Configured implements Tool, + Mapper<Text, MetaWrapper, Text, MetaWrapper>, + Reducer<Text, MetaWrapper, Text, MetaWrapper> { + private static final Logger LOG = LoggerFactory + .getLogger(SegmentMerger.class); + + private static final String SEGMENT_PART_KEY = "part"; + private static final String SEGMENT_SLICE_KEY = "slice"; + + private URLFilters filters = null; + private URLNormalizers normalizers = null; + private SegmentMergeFilters mergeFilters = null; + private long sliceSize = -1; + private long curCount = 0; + + /** + * Wraps inputs in an {@link MetaWrapper}, to permit merging different types + * in reduce and use additional metadata. + */ + public static class ObjectInputFormat extends + SequenceFileInputFormat<Text, MetaWrapper> { + + @Override + public RecordReader<Text, MetaWrapper> getRecordReader( + final InputSplit split, final JobConf job, Reporter reporter) + throws IOException { + + reporter.setStatus(split.toString()); + + // find part name + SegmentPart segmentPart; + final String spString; + final FileSplit fSplit = (FileSplit) split; + try { + segmentPart = SegmentPart.get(fSplit); + spString = segmentPart.toString(); + } catch (IOException e) { + throw new RuntimeException("Cannot identify segment:", e); + } + + SequenceFile.Reader reader = new SequenceFile.Reader(job, SequenceFile.Reader.file(fSplit.getPath())); + + final Writable w; + try { + w = (Writable) reader.getValueClass().newInstance(); + } catch (Exception e) { + throw new IOException(e.toString()); + } finally { + try { + reader.close(); + } catch (Exception e) { + // ignore + } + } + final SequenceFileRecordReader<Text, Writable> splitReader = new SequenceFileRecordReader<Text, Writable>( + job, (FileSplit) split); + + try { + return new SequenceFileRecordReader<Text, MetaWrapper>(job, fSplit) { + + public synchronized boolean next(Text key, MetaWrapper wrapper) + throws IOException { + LOG.debug("Running OIF.next()"); + + boolean res = splitReader.next(key, w); + wrapper.set(w); + wrapper.setMeta(SEGMENT_PART_KEY, spString); + return res; + } + + @Override + public synchronized void close() throws IOException { + splitReader.close(); + } + + @Override + public MetaWrapper createValue() { + return new MetaWrapper(); + } + + }; + } catch (IOException e) { + throw new RuntimeException("Cannot create RecordReader: ", e); + } + } + } + + public static class SegmentOutputFormat extends + FileOutputFormat<Text, MetaWrapper> { + private static final String DEFAULT_SLICE = "default"; + + @Override + public RecordWriter<Text, MetaWrapper> getRecordWriter(final FileSystem fs, + final JobConf job, final String name, final Progressable progress) + throws IOException { + return new RecordWriter<Text, MetaWrapper>() { + MapFile.Writer c_out = null; + MapFile.Writer f_out = null; + MapFile.Writer pd_out = null; + MapFile.Writer pt_out = null; + SequenceFile.Writer g_out = null; + SequenceFile.Writer p_out = null; + HashMap<String, Closeable> sliceWriters = new HashMap<String, Closeable>(); + String segmentName = job.get("segment.merger.segmentName"); + + public void write(Text key, MetaWrapper wrapper) throws IOException { + // unwrap + SegmentPart sp = SegmentPart.parse(wrapper.getMeta(SEGMENT_PART_KEY)); + Writable o = wrapper.get(); + String slice = wrapper.getMeta(SEGMENT_SLICE_KEY); + if (o instanceof CrawlDatum) { + if (sp.partName.equals(CrawlDatum.GENERATE_DIR_NAME)) { + g_out = ensureSequenceFile(slice, CrawlDatum.GENERATE_DIR_NAME); + g_out.append(key, o); + } else if (sp.partName.equals(CrawlDatum.FETCH_DIR_NAME)) { + f_out = ensureMapFile(slice, CrawlDatum.FETCH_DIR_NAME, + CrawlDatum.class); + f_out.append(key, o); + } else if (sp.partName.equals(CrawlDatum.PARSE_DIR_NAME)) { + p_out = ensureSequenceFile(slice, CrawlDatum.PARSE_DIR_NAME); + p_out.append(key, o); + } else { + throw new IOException("Cannot determine segment part: " + + sp.partName); + } + } else if (o instanceof Content) { + c_out = ensureMapFile(slice, Content.DIR_NAME, Content.class); + c_out.append(key, o); + } else if (o instanceof ParseData) { + // update the segment name inside contentMeta - required by Indexer + if (slice == null) { + ((ParseData) o).getContentMeta().set(Nutch.SEGMENT_NAME_KEY, + segmentName); + } else { + ((ParseData) o).getContentMeta().set(Nutch.SEGMENT_NAME_KEY, + segmentName + "-" + slice); + } + pd_out = ensureMapFile(slice, ParseData.DIR_NAME, ParseData.class); + pd_out.append(key, o); + } else if (o instanceof ParseText) { + pt_out = ensureMapFile(slice, ParseText.DIR_NAME, ParseText.class); + pt_out.append(key, o); + } + } + + // lazily create SequenceFile-s. + private SequenceFile.Writer ensureSequenceFile(String slice, + String dirName) throws IOException { + if (slice == null) + slice = DEFAULT_SLICE; + SequenceFile.Writer res = (SequenceFile.Writer) sliceWriters + .get(slice + dirName); + if (res != null) + return res; + Path wname; + Path out = FileOutputFormat.getOutputPath(job); + if (slice == DEFAULT_SLICE) { + wname = new Path(new Path(new Path(out, segmentName), dirName), + name); + } else { + wname = new Path(new Path(new Path(out, segmentName + "-" + slice), + dirName), name); + } + +// Option rKeyClassOpt = MapFile.Writer.keyClass(Text.class); +// org.apache.hadoop.io.SequenceFile.Writer.Option rValClassOpt = SequenceFile.Writer.valueClass(CrawlDatum.class); +// Option rProgressOpt = (Option) SequenceFile.Writer.progressable(progress); +// Option rCompOpt = (Option) SequenceFile.Writer.compression(SequenceFileOutputFormat.getOutputCompressionType(job)); +// Option rFileOpt = (Option) SequenceFile.Writer.file(wname); + + //res = SequenceFile.createWriter(job, rFileOpt, rKeyClassOpt, + // rValClassOpt, rCompOpt, rProgressOpt); + + res = SequenceFile.createWriter(job, SequenceFile.Writer.file(wname), + SequenceFile.Writer.keyClass(Text.class), + SequenceFile.Writer.valueClass(CrawlDatum.class), + SequenceFile.Writer.bufferSize(fs.getConf().getInt("io.file.buffer.size",4096)), + SequenceFile.Writer.replication(fs.getDefaultReplication(wname)), + SequenceFile.Writer.blockSize(1073741824), + SequenceFile.Writer.compression(SequenceFileOutputFormat.getOutputCompressionType(job), new DefaultCodec()), + SequenceFile.Writer.progressable(progress), + SequenceFile.Writer.metadata(new Metadata())); + + sliceWriters.put(slice + dirName, res); + return res; + } + + // lazily create MapFile-s. + private MapFile.Writer ensureMapFile(String slice, String dirName, + Class<? extends Writable> clazz) throws IOException { + if (slice == null) + slice = DEFAULT_SLICE; + MapFile.Writer res = (MapFile.Writer) sliceWriters.get(slice + + dirName); + if (res != null) + return res; + Path wname; + Path out = FileOutputFormat.getOutputPath(job); + if (slice == DEFAULT_SLICE) { + wname = new Path(new Path(new Path(out, segmentName), dirName), + name); + } else { + wname = new Path(new Path(new Path(out, segmentName + "-" + slice), + dirName), name); + } + CompressionType compType = SequenceFileOutputFormat + .getOutputCompressionType(job); + if (clazz.isAssignableFrom(ParseText.class)) { + compType = CompressionType.RECORD; + } + + Option rKeyClassOpt = (Option) MapFile.Writer.keyClass(Text.class); + org.apache.hadoop.io.SequenceFile.Writer.Option rValClassOpt = SequenceFile.Writer.valueClass(clazz); + org.apache.hadoop.io.SequenceFile.Writer.Option rProgressOpt = SequenceFile.Writer.progressable(progress); + org.apache.hadoop.io.SequenceFile.Writer.Option rCompOpt = SequenceFile.Writer.compression(compType); + + res = new MapFile.Writer(job, wname, rKeyClassOpt, + rValClassOpt, rCompOpt, rProgressOpt); + sliceWriters.put(slice + dirName, res); + return res; + } + + public void close(Reporter reporter) throws IOException { + Iterator<Closeable> it = sliceWriters.values().iterator(); + while (it.hasNext()) { + Object o = it.next(); + if (o instanceof SequenceFile.Writer) { + ((SequenceFile.Writer) o).close(); + } else { + ((MapFile.Writer) o).close(); + } + } + } + }; + } + } + + public SegmentMerger() { + super(null); + } + + public SegmentMerger(Configuration conf) { + super(conf); + } + + public void setConf(Configuration conf) { + super.setConf(conf); + if (conf == null) + return; + if (conf.getBoolean("segment.merger.filter", false)) { + filters = new URLFilters(conf); + mergeFilters = new SegmentMergeFilters(conf); + } + if (conf.getBoolean("segment.merger.normalizer", false)) + normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_DEFAULT); + sliceSize = conf.getLong("segment.merger.slice", -1); + if ((sliceSize > 0) && (LOG.isInfoEnabled())) { + LOG.info("Slice size: " + sliceSize + " URLs."); + } + } + + public void close() throws IOException { + } + + public void configure(JobConf conf) { + setConf(conf); + if (sliceSize > 0) { + sliceSize = sliceSize / conf.getNumReduceTasks(); + } + } + + private Text newKey = new Text(); + + public void map(Text key, MetaWrapper value, + OutputCollector<Text, MetaWrapper> output, Reporter reporter) + throws IOException { + String url = key.toString(); + if (normalizers != null) { + try { + url = normalizers.normalize(url, URLNormalizers.SCOPE_DEFAULT); // normalize + // the + // url + } catch (Exception e) { + LOG.warn("Skipping " + url + ":" + e.getMessage()); + url = null; + } + } + if (url != null && filters != null) { + try { + url = filters.filter(url); + } catch (Exception e) { + LOG.warn("Skipping key " + url + ": " + e.getMessage()); + url = null; + } + } + if (url != null) { + newKey.set(url); + output.collect(newKey, value); + } + } + + /** + * NOTE: in selecting the latest version we rely exclusively on the segment + * name (not all segment data contain time information). Therefore it is + * extremely important that segments be named in an increasing lexicographic + * order as their creation time increases. + */ + public void reduce(Text key, Iterator<MetaWrapper> values, + OutputCollector<Text, MetaWrapper> output, Reporter reporter) + throws IOException { + CrawlDatum lastG = null; + CrawlDatum lastF = null; + CrawlDatum lastSig = null; + Content lastC = null; + ParseData lastPD = null; + ParseText lastPT = null; + String lastGname = null; + String lastFname = null; + String lastSigname = null; + String lastCname = null; + String lastPDname = null; + String lastPTname = null; + TreeMap<String, ArrayList<CrawlDatum>> linked = new TreeMap<String, ArrayList<CrawlDatum>>(); + while (values.hasNext()) { + MetaWrapper wrapper = values.next(); + Object o = wrapper.get(); + String spString = wrapper.getMeta(SEGMENT_PART_KEY); + if (spString == null) { + throw new IOException("Null segment part, key=" + key); + } + SegmentPart sp = SegmentPart.parse(spString); + if (o instanceof CrawlDatum) { + CrawlDatum val = (CrawlDatum) o; + // check which output dir it belongs to + if (sp.partName.equals(CrawlDatum.GENERATE_DIR_NAME)) { + if (lastG == null) { + lastG = val; + lastGname = sp.segmentName; + } else { + // take newer + if (lastGname.compareTo(sp.segmentName) < 0) { + lastG = val; + lastGname = sp.segmentName; + } + } + } else if (sp.partName.equals(CrawlDatum.FETCH_DIR_NAME)) { + // only consider fetch status and ignore fetch retry status + // https://issues.apache.org/jira/browse/NUTCH-1520 + // https://issues.apache.org/jira/browse/NUTCH-1113 + if (CrawlDatum.hasFetchStatus(val) + && val.getStatus() != CrawlDatum.STATUS_FETCH_RETRY + && val.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED) { + if (lastF == null) { + lastF = val; + lastFname = sp.segmentName; + } else { + if (lastFname.compareTo(sp.segmentName) < 0) { + lastF = val; + lastFname = sp.segmentName; + } + } + } + } else if (sp.partName.equals(CrawlDatum.PARSE_DIR_NAME)) { + if (val.getStatus() == CrawlDatum.STATUS_SIGNATURE) { + if (lastSig == null) { + lastSig = val; + lastSigname = sp.segmentName; + } else { + // take newer + if (lastSigname.compareTo(sp.segmentName) < 0) { + lastSig = val; + lastSigname = sp.segmentName; + } + } + continue; + } + // collect all LINKED values from the latest segment + ArrayList<CrawlDatum> segLinked = linked.get(sp.segmentName); + if (segLinked == null) { + segLinked = new ArrayList<CrawlDatum>(); + linked.put(sp.segmentName, segLinked); + } + segLinked.add(val); + } else { + throw new IOException("Cannot determine segment part: " + sp.partName); + } + } else if (o instanceof Content) { + if (lastC == null) { + lastC = (Content) o; + lastCname = sp.segmentName; + } else { + if (lastCname.compareTo(sp.segmentName) < 0) { + lastC = (Content) o; + lastCname = sp.segmentName; + } + } + } else if (o instanceof ParseData) { + if (lastPD == null) { + lastPD = (ParseData) o; + lastPDname = sp.segmentName; + } else { + if (lastPDname.compareTo(sp.segmentName) < 0) { + lastPD = (ParseData) o; + lastPDname = sp.segmentName; + } + } + } else if (o instanceof ParseText) { + if (lastPT == null) { + lastPT = (ParseText) o; + lastPTname = sp.segmentName; + } else { + if (lastPTname.compareTo(sp.segmentName) < 0) { + lastPT = (ParseText) o; + lastPTname = sp.segmentName; + } + } + } + } + // perform filtering based on full merge record + if (mergeFilters != null + && !mergeFilters.filter(key, lastG, lastF, lastSig, lastC, lastPD, + lastPT, linked.isEmpty() ? null : linked.lastEntry().getValue())) { + return; + } + + curCount++; + String sliceName = null; + MetaWrapper wrapper = new MetaWrapper(); + if (sliceSize > 0) { + sliceName = String.valueOf(curCount / sliceSize); + wrapper.setMeta(SEGMENT_SLICE_KEY, sliceName); + } + SegmentPart sp = new SegmentPart(); + // now output the latest values + if (lastG != null) { + wrapper.set(lastG); + sp.partName = CrawlDatum.GENERATE_DIR_NAME; + sp.segmentName = lastGname; + wrapper.setMeta(SEGMENT_PART_KEY, sp.toString()); + output.collect(key, wrapper); + } + if (lastF != null) { + wrapper.set(lastF); + sp.partName = CrawlDatum.FETCH_DIR_NAME; + sp.segmentName = lastFname; + wrapper.setMeta(SEGMENT_PART_KEY, sp.toString()); + output.collect(key, wrapper); + } + if (lastSig != null) { + wrapper.set(lastSig); + sp.partName = CrawlDatum.PARSE_DIR_NAME; + sp.segmentName = lastSigname; + wrapper.setMeta(SEGMENT_PART_KEY, sp.toString()); + output.collect(key, wrapper); + } + if (lastC != null) { + wrapper.set(lastC); + sp.partName = Content.DIR_NAME; + sp.segmentName = lastCname; + wrapper.setMeta(SEGMENT_PART_KEY, sp.toString()); + output.collect(key, wrapper); + } + if (lastPD != null) { + wrapper.set(lastPD); + sp.partName = ParseData.DIR_NAME; + sp.segmentName = lastPDname; + wrapper.setMeta(SEGMENT_PART_KEY, sp.toString()); + output.collect(key, wrapper); + } + if (lastPT != null) { + wrapper.set(lastPT); + sp.partName = ParseText.DIR_NAME; + sp.segmentName = lastPTname; + wrapper.setMeta(SEGMENT_PART_KEY, sp.toString()); + output.collect(key, wrapper); + } + if (linked.size() > 0) { + String name = linked.lastKey(); + sp.partName = CrawlDatum.PARSE_DIR_NAME; + sp.segmentName = name; + wrapper.setMeta(SEGMENT_PART_KEY, sp.toString()); + ArrayList<CrawlDatum> segLinked = linked.get(name); + for (int i = 0; i < segLinked.size(); i++) { + CrawlDatum link = segLinked.get(i); + wrapper.set(link); + output.collect(key, wrapper); + } + } + } + + public void merge(Path out, Path[] segs, boolean filter, boolean normalize, + long slice) throws Exception { + String segmentName = Generator.generateSegmentName(); + if (LOG.isInfoEnabled()) { + LOG.info("Merging " + segs.length + " segments to " + out + "/" + + segmentName); + } + JobConf job = new NutchJob(getConf()); + job.setJobName("mergesegs " + out + "/" + segmentName); + job.setBoolean("segment.merger.filter", filter); + job.setBoolean("segment.merger.normalizer", normalize); + job.setLong("segment.merger.slice", slice); + job.set("segment.merger.segmentName", segmentName); + FileSystem fs = FileSystem.get(getConf()); + // prepare the minimal common set of input dirs + boolean g = true; + boolean f = true; + boolean p = true; + boolean c = true; + boolean pd = true; + boolean pt = true; + + // These contain previous values, we use it to track changes in the loop + boolean pg = true; + boolean pf = true; + boolean pp = true; + boolean pc = true; + boolean ppd = true; + boolean ppt = true; + for (int i = 0; i < segs.length; i++) { + if (!fs.exists(segs[i])) { + if (LOG.isWarnEnabled()) { + LOG.warn("Input dir " + segs[i] + " doesn't exist, skipping."); + } + segs[i] = null; + continue; + } + if (LOG.isInfoEnabled()) { + LOG.info("SegmentMerger: adding " + segs[i]); + } + Path cDir = new Path(segs[i], Content.DIR_NAME); + Path gDir = new Path(segs[i], CrawlDatum.GENERATE_DIR_NAME); + Path fDir = new Path(segs[i], CrawlDatum.FETCH_DIR_NAME); + Path pDir = new Path(segs[i], CrawlDatum.PARSE_DIR_NAME); + Path pdDir = new Path(segs[i], ParseData.DIR_NAME); + Path ptDir = new Path(segs[i], ParseText.DIR_NAME); + c = c && fs.exists(cDir); + g = g && fs.exists(gDir); + f = f && fs.exists(fDir); + p = p && fs.exists(pDir); + pd = pd && fs.exists(pdDir); + pt = pt && fs.exists(ptDir); + + // Input changed? + if (g != pg || f != pf || p != pp || c != pc || pd != ppd || pt != ppt) { + LOG.info(segs[i] + " changed input dirs"); + } + + pg = g; pf = f; pp = p; pc = c; ppd = pd; ppt = pt; + } + StringBuffer sb = new StringBuffer(); + if (c) + sb.append(" " + Content.DIR_NAME); + if (g) + sb.append(" " + CrawlDatum.GENERATE_DIR_NAME); + if (f) + sb.append(" " + CrawlDatum.FETCH_DIR_NAME); + if (p) + sb.append(" " + CrawlDatum.PARSE_DIR_NAME); + if (pd) + sb.append(" " + ParseData.DIR_NAME); + if (pt) + sb.append(" " + ParseText.DIR_NAME); + if (LOG.isInfoEnabled()) { + LOG.info("SegmentMerger: using segment data from:" + sb.toString()); + } + for (int i = 0; i < segs.length; i++) { + if (segs[i] == null) + continue; + if (g) { + Path gDir = new Path(segs[i], CrawlDatum.GENERATE_DIR_NAME); + FileInputFormat.addInputPath(job, gDir); + } + if (c) { + Path cDir = new Path(segs[i], Content.DIR_NAME); + FileInputFormat.addInputPath(job, cDir); + } + if (f) { + Path fDir = new Path(segs[i], CrawlDatum.FETCH_DIR_NAME); + FileInputFormat.addInputPath(job, fDir); + } + if (p) { + Path pDir = new Path(segs[i], CrawlDatum.PARSE_DIR_NAME); + FileInputFormat.addInputPath(job, pDir); + } + if (pd) { + Path pdDir = new Path(segs[i], ParseData.DIR_NAME); + FileInputFormat.addInputPath(job, pdDir); + } + if (pt) { + Path ptDir = new Path(segs[i], ParseText.DIR_NAME); + FileInputFormat.addInputPath(job, ptDir); + } + } + job.setInputFormat(ObjectInputFormat.class); + job.setMapperClass(SegmentMerger.class); + job.setReducerClass(SegmentMerger.class); + FileOutputFormat.setOutputPath(job, out); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(MetaWrapper.class); + job.setOutputFormat(SegmentOutputFormat.class); + + setConf(job); + + JobClient.runJob(job); + } + + /** + * @param args + */ + public int run(String[] args) throws Exception { + if (args.length < 2) { + System.err + .println("SegmentMerger output_dir (-dir segments | seg1 seg2 ...) [-filter] [-slice NNNN]"); + System.err + .println("\toutput_dir\tname of the parent dir for output segment slice(s)"); + System.err + .println("\t-dir segments\tparent dir containing several segments"); + System.err.println("\tseg1 seg2 ...\tlist of segment dirs"); + System.err + .println("\t-filter\t\tfilter out URL-s prohibited by current URLFilters"); + System.err + .println("\t-normalize\t\tnormalize URL via current URLNormalizers"); + System.err + .println("\t-slice NNNN\tcreate many output segments, each containing NNNN URLs"); + return -1; + } + Configuration conf = NutchConfiguration.create(); + final FileSystem fs = FileSystem.get(conf); + Path out = new Path(args[0]); + ArrayList<Path> segs = new ArrayList<Path>(); + long sliceSize = 0; + boolean filter = false; + boolean normalize = false; + for (int i = 1; i < args.length; i++) { + if (args[i].equals("-dir")) { + FileStatus[] fstats = fs.listStatus(new Path(args[++i]), + HadoopFSUtil.getPassDirectoriesFilter(fs)); + Path[] files = HadoopFSUtil.getPaths(fstats); + for (int j = 0; j < files.length; j++) + segs.add(files[j]); + } else if (args[i].equals("-filter")) { + filter = true; + } else if (args[i].equals("-normalize")) { + normalize = true; + } else if (args[i].equals("-slice")) { + sliceSize = Long.parseLong(args[++i]); + } else { + segs.add(new Path(args[i])); + } + } + if (segs.size() == 0) { + System.err.println("ERROR: No input segments."); + return -1; + } + + merge(out, segs.toArray(new Path[segs.size()]), filter, normalize, + sliceSize); + return 0; + } + + public static void main(String[] args) throws Exception { + int result = ToolRunner.run(NutchConfiguration.create(), + new SegmentMerger(), args); + System.exit(result); + } + +}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/segment/SegmentPart.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/segment/SegmentPart.java b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentPart.java new file mode 100644 index 0000000..84247e4 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentPart.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.segment; + +import java.io.IOException; + +import org.apache.hadoop.mapred.FileSplit; + +/** + * Utility class for handling information about segment parts. + * + * @author Andrzej Bialecki + */ +public class SegmentPart { + /** Name of the segment (just the last path component). */ + public String segmentName; + /** Name of the segment part (ie. one of subdirectories inside a segment). */ + public String partName; + + public SegmentPart() { + + } + + public SegmentPart(String segmentName, String partName) { + this.segmentName = segmentName; + this.partName = partName; + } + + /** + * Return a String representation of this class, in the form + * "segmentName/partName". + */ + public String toString() { + return segmentName + "/" + partName; + } + + /** + * Create SegmentPart from a FileSplit. + * + * @param split + * @return A {@link SegmentPart} resultant from a {@link FileSplit}. + * @throws Exception + */ + public static SegmentPart get(FileSplit split) throws IOException { + return get(split.getPath().toString()); + } + + /** + * Create SegmentPart from a full path of a location inside any segment part. + * + * @param path + * full path into a segment part (may include "part-xxxxx" + * components) + * @return SegmentPart instance describing this part. + * @throws IOException + * if any required path components are missing. + */ + public static SegmentPart get(String path) throws IOException { + // find part name + String dir = path.replace('\\', '/'); + int idx = dir.lastIndexOf("/part-"); + if (idx == -1) { + throw new IOException("Cannot determine segment part: " + dir); + } + dir = dir.substring(0, idx); + idx = dir.lastIndexOf('/'); + if (idx == -1) { + throw new IOException("Cannot determine segment part: " + dir); + } + String part = dir.substring(idx + 1); + // find segment name + dir = dir.substring(0, idx); + idx = dir.lastIndexOf('/'); + if (idx == -1) { + throw new IOException("Cannot determine segment name: " + dir); + } + String segment = dir.substring(idx + 1); + return new SegmentPart(segment, part); + } + + /** + * Create SegmentPart from a String in format "segmentName/partName". + * + * @param string + * input String + * @return parsed instance of SegmentPart + * @throws IOException + * if "/" is missing. + */ + public static SegmentPart parse(String string) throws IOException { + int idx = string.indexOf('/'); + if (idx == -1) { + throw new IOException("Invalid SegmentPart: '" + string + "'"); + } + String segment = string.substring(0, idx); + String part = string.substring(idx + 1); + return new SegmentPart(segment, part); + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/segment/SegmentReader.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/segment/SegmentReader.java b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentReader.java new file mode 100644 index 0000000..d00d1e2 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentReader.java @@ -0,0 +1,719 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.segment; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.PrintStream; +import java.io.PrintWriter; +import java.io.Writer; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.MapFile; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapFileOutputFormat; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.util.Progressable; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.NutchWritable; +import org.apache.nutch.parse.ParseData; +import org.apache.nutch.parse.ParseText; +import org.apache.nutch.protocol.Content; +import org.apache.nutch.util.HadoopFSUtil; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; + +/** Dump the content of a segment. */ +public class SegmentReader extends Configured implements + Reducer<Text, NutchWritable, Text, Text> { + + public static final Logger LOG = LoggerFactory.getLogger(SegmentReader.class); + + long recNo = 0L; + + private boolean co, fe, ge, pa, pd, pt; + private FileSystem fs; + + public static class InputCompatMapper extends MapReduceBase implements + Mapper<WritableComparable<?>, Writable, Text, NutchWritable> { + private Text newKey = new Text(); + + public void map(WritableComparable<?> key, Writable value, + OutputCollector<Text, NutchWritable> collector, Reporter reporter) + throws IOException { + // convert on the fly from old formats with UTF8 keys. + // UTF8 deprecated and replaced by Text. + if (key instanceof Text) { + newKey.set(key.toString()); + key = newKey; + } + collector.collect((Text) key, new NutchWritable(value)); + } + + } + + /** Implements a text output format */ + public static class TextOutputFormat extends + FileOutputFormat<WritableComparable<?>, Writable> { + public RecordWriter<WritableComparable<?>, Writable> getRecordWriter( + final FileSystem fs, JobConf job, String name, + final Progressable progress) throws IOException { + + final Path segmentDumpFile = new Path( + FileOutputFormat.getOutputPath(job), name); + + // Get the old copy out of the way + if (fs.exists(segmentDumpFile)) + fs.delete(segmentDumpFile, true); + + final PrintStream printStream = new PrintStream( + fs.create(segmentDumpFile)); + return new RecordWriter<WritableComparable<?>, Writable>() { + public synchronized void write(WritableComparable<?> key, Writable value) + throws IOException { + printStream.println(value); + } + + public synchronized void close(Reporter reporter) throws IOException { + printStream.close(); + } + }; + } + } + + public SegmentReader() { + super(null); + } + + public SegmentReader(Configuration conf, boolean co, boolean fe, boolean ge, + boolean pa, boolean pd, boolean pt) { + super(conf); + this.co = co; + this.fe = fe; + this.ge = ge; + this.pa = pa; + this.pd = pd; + this.pt = pt; + try { + this.fs = FileSystem.get(getConf()); + } catch (IOException e) { + LOG.error("IOException:", e); + } + } + + public void configure(JobConf job) { + setConf(job); + this.co = getConf().getBoolean("segment.reader.co", true); + this.fe = getConf().getBoolean("segment.reader.fe", true); + this.ge = getConf().getBoolean("segment.reader.ge", true); + this.pa = getConf().getBoolean("segment.reader.pa", true); + this.pd = getConf().getBoolean("segment.reader.pd", true); + this.pt = getConf().getBoolean("segment.reader.pt", true); + try { + this.fs = FileSystem.get(getConf()); + } catch (IOException e) { + LOG.error("IOException:", e); + } + } + + private JobConf createJobConf() { + JobConf job = new NutchJob(getConf()); + job.setBoolean("segment.reader.co", this.co); + job.setBoolean("segment.reader.fe", this.fe); + job.setBoolean("segment.reader.ge", this.ge); + job.setBoolean("segment.reader.pa", this.pa); + job.setBoolean("segment.reader.pd", this.pd); + job.setBoolean("segment.reader.pt", this.pt); + return job; + } + + public void close() { + } + + public void reduce(Text key, Iterator<NutchWritable> values, + OutputCollector<Text, Text> output, Reporter reporter) throws IOException { + StringBuffer dump = new StringBuffer(); + + dump.append("\nRecno:: ").append(recNo++).append("\n"); + dump.append("URL:: " + key.toString() + "\n"); + while (values.hasNext()) { + Writable value = values.next().get(); // unwrap + if (value instanceof CrawlDatum) { + dump.append("\nCrawlDatum::\n").append(((CrawlDatum) value).toString()); + } else if (value instanceof Content) { + dump.append("\nContent::\n").append(((Content) value).toString()); + } else if (value instanceof ParseData) { + dump.append("\nParseData::\n").append(((ParseData) value).toString()); + } else if (value instanceof ParseText) { + dump.append("\nParseText::\n").append(((ParseText) value).toString()); + } else if (LOG.isWarnEnabled()) { + LOG.warn("Unrecognized type: " + value.getClass()); + } + } + output.collect(key, new Text(dump.toString())); + } + + public void dump(Path segment, Path output) throws IOException { + + if (LOG.isInfoEnabled()) { + LOG.info("SegmentReader: dump segment: " + segment); + } + + JobConf job = createJobConf(); + job.setJobName("read " + segment); + + if (ge) + FileInputFormat.addInputPath(job, new Path(segment, + CrawlDatum.GENERATE_DIR_NAME)); + if (fe) + FileInputFormat.addInputPath(job, new Path(segment, + CrawlDatum.FETCH_DIR_NAME)); + if (pa) + FileInputFormat.addInputPath(job, new Path(segment, + CrawlDatum.PARSE_DIR_NAME)); + if (co) + FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME)); + if (pd) + FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME)); + if (pt) + FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME)); + + job.setInputFormat(SequenceFileInputFormat.class); + job.setMapperClass(InputCompatMapper.class); + job.setReducerClass(SegmentReader.class); + + Path tempDir = new Path(job.get("hadoop.tmp.dir", "/tmp") + "/segread-" + + new java.util.Random().nextInt()); + fs.delete(tempDir, true); + + FileOutputFormat.setOutputPath(job, tempDir); + job.setOutputFormat(TextOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(NutchWritable.class); + + JobClient.runJob(job); + + // concatenate the output + Path dumpFile = new Path(output, job.get("segment.dump.dir", "dump")); + + // remove the old file + fs.delete(dumpFile, true); + FileStatus[] fstats = fs.listStatus(tempDir, + HadoopFSUtil.getPassAllFilter()); + Path[] files = HadoopFSUtil.getPaths(fstats); + + PrintWriter writer = null; + int currentRecordNumber = 0; + if (files.length > 0) { + writer = new PrintWriter(new BufferedWriter(new OutputStreamWriter( + fs.create(dumpFile)))); + try { + for (int i = 0; i < files.length; i++) { + Path partFile = files[i]; + try { + currentRecordNumber = append(fs, job, partFile, writer, + currentRecordNumber); + } catch (IOException exception) { + if (LOG.isWarnEnabled()) { + LOG.warn("Couldn't copy the content of " + partFile.toString() + + " into " + dumpFile.toString()); + LOG.warn(exception.getMessage()); + } + } + } + } finally { + writer.close(); + } + } + fs.delete(tempDir, true); + if (LOG.isInfoEnabled()) { + LOG.info("SegmentReader: done"); + } + } + + /** Appends two files and updates the Recno counter */ + private int append(FileSystem fs, Configuration conf, Path src, + PrintWriter writer, int currentRecordNumber) throws IOException { + BufferedReader reader = new BufferedReader(new InputStreamReader( + fs.open(src))); + try { + String line = reader.readLine(); + while (line != null) { + if (line.startsWith("Recno:: ")) { + line = "Recno:: " + currentRecordNumber++; + } + writer.println(line); + line = reader.readLine(); + } + return currentRecordNumber; + } finally { + reader.close(); + } + } + + private static final String[][] keys = new String[][] { + { "co", "Content::\n" }, { "ge", "Crawl Generate::\n" }, + { "fe", "Crawl Fetch::\n" }, { "pa", "Crawl Parse::\n" }, + { "pd", "ParseData::\n" }, { "pt", "ParseText::\n" } }; + + public void get(final Path segment, final Text key, Writer writer, + final Map<String, List<Writable>> results) throws Exception { + LOG.info("SegmentReader: get '" + key + "'"); + ArrayList<Thread> threads = new ArrayList<Thread>(); + if (co) + threads.add(new Thread() { + public void run() { + try { + List<Writable> res = getMapRecords(new Path(segment, + Content.DIR_NAME), key); + results.put("co", res); + } catch (Exception e) { + LOG.error("Exception:", e); + } + } + }); + if (fe) + threads.add(new Thread() { + public void run() { + try { + List<Writable> res = getMapRecords(new Path(segment, + CrawlDatum.FETCH_DIR_NAME), key); + results.put("fe", res); + } catch (Exception e) { + LOG.error("Exception:", e); + } + } + }); + if (ge) + threads.add(new Thread() { + public void run() { + try { + List<Writable> res = getSeqRecords(new Path(segment, + CrawlDatum.GENERATE_DIR_NAME), key); + results.put("ge", res); + } catch (Exception e) { + LOG.error("Exception:", e); + } + } + }); + if (pa) + threads.add(new Thread() { + public void run() { + try { + List<Writable> res = getSeqRecords(new Path(segment, + CrawlDatum.PARSE_DIR_NAME), key); + results.put("pa", res); + } catch (Exception e) { + LOG.error("Exception:", e); + } + } + }); + if (pd) + threads.add(new Thread() { + public void run() { + try { + List<Writable> res = getMapRecords(new Path(segment, + ParseData.DIR_NAME), key); + results.put("pd", res); + } catch (Exception e) { + LOG.error("Exception:", e); + } + } + }); + if (pt) + threads.add(new Thread() { + public void run() { + try { + List<Writable> res = getMapRecords(new Path(segment, + ParseText.DIR_NAME), key); + results.put("pt", res); + } catch (Exception e) { + LOG.error("Exception:", e); + } + } + }); + Iterator<Thread> it = threads.iterator(); + while (it.hasNext()) + it.next().start(); + int cnt; + do { + cnt = 0; + try { + Thread.sleep(5000); + } catch (Exception e) { + } + ; + it = threads.iterator(); + while (it.hasNext()) { + if (it.next().isAlive()) + cnt++; + } + if ((cnt > 0) && (LOG.isDebugEnabled())) { + LOG.debug("(" + cnt + " to retrieve)"); + } + } while (cnt > 0); + for (int i = 0; i < keys.length; i++) { + List<Writable> res = results.get(keys[i][0]); + if (res != null && res.size() > 0) { + for (int k = 0; k < res.size(); k++) { + writer.write(keys[i][1]); + writer.write(res.get(k) + "\n"); + } + } + writer.flush(); + } + } + + private List<Writable> getMapRecords(Path dir, Text key) throws Exception { + MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, dir, + getConf()); + ArrayList<Writable> res = new ArrayList<Writable>(); + Class<?> keyClass = readers[0].getKeyClass(); + Class<?> valueClass = readers[0].getValueClass(); + if (!keyClass.getName().equals("org.apache.hadoop.io.Text")) + throw new IOException("Incompatible key (" + keyClass.getName() + ")"); + Writable value = (Writable) valueClass.newInstance(); + // we don't know the partitioning schema + for (int i = 0; i < readers.length; i++) { + if (readers[i].get(key, value) != null) { + res.add(value); + value = (Writable) valueClass.newInstance(); + Text aKey = (Text) keyClass.newInstance(); + while (readers[i].next(aKey, value) && aKey.equals(key)) { + res.add(value); + value = (Writable) valueClass.newInstance(); + } + } + readers[i].close(); + } + return res; + } + + private List<Writable> getSeqRecords(Path dir, Text key) throws Exception { + SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders( + getConf(), dir); + ArrayList<Writable> res = new ArrayList<Writable>(); + Class<?> keyClass = readers[0].getKeyClass(); + Class<?> valueClass = readers[0].getValueClass(); + if (!keyClass.getName().equals("org.apache.hadoop.io.Text")) + throw new IOException("Incompatible key (" + keyClass.getName() + ")"); + Writable aKey = (Writable) keyClass.newInstance(); + Writable value = (Writable) valueClass.newInstance(); + for (int i = 0; i < readers.length; i++) { + while (readers[i].next(aKey, value)) { + if (aKey.equals(key)) { + res.add(value); + value = (Writable) valueClass.newInstance(); + } + } + readers[i].close(); + } + return res; + } + + public static class SegmentReaderStats { + public long start = -1L; + public long end = -1L; + public long generated = -1L; + public long fetched = -1L; + public long fetchErrors = -1L; + public long parsed = -1L; + public long parseErrors = -1L; + } + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"); + + public void list(List<Path> dirs, Writer writer) throws Exception { + writer + .write("NAME\t\tGENERATED\tFETCHER START\t\tFETCHER END\t\tFETCHED\tPARSED\n"); + for (int i = 0; i < dirs.size(); i++) { + Path dir = dirs.get(i); + SegmentReaderStats stats = new SegmentReaderStats(); + getStats(dir, stats); + writer.write(dir.getName() + "\t"); + if (stats.generated == -1) + writer.write("?"); + else + writer.write(stats.generated + ""); + writer.write("\t\t"); + if (stats.start == -1) + writer.write("?\t"); + else + writer.write(sdf.format(new Date(stats.start))); + writer.write("\t"); + if (stats.end == -1) + writer.write("?"); + else + writer.write(sdf.format(new Date(stats.end))); + writer.write("\t"); + if (stats.fetched == -1) + writer.write("?"); + else + writer.write(stats.fetched + ""); + writer.write("\t"); + if (stats.parsed == -1) + writer.write("?"); + else + writer.write(stats.parsed + ""); + writer.write("\n"); + writer.flush(); + } + } + + public void getStats(Path segment, final SegmentReaderStats stats) + throws Exception { + long cnt = 0L; + Text key = new Text(); + + if (ge) { + SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders( + getConf(), new Path(segment, CrawlDatum.GENERATE_DIR_NAME)); + for (int i = 0; i < readers.length; i++) { + while (readers[i].next(key)) + cnt++; + readers[i].close(); + } + stats.generated = cnt; + } + + if (fe) { + Path fetchDir = new Path(segment, CrawlDatum.FETCH_DIR_NAME); + if (fs.exists(fetchDir) && fs.getFileStatus(fetchDir).isDirectory()) { + cnt = 0L; + long start = Long.MAX_VALUE; + long end = Long.MIN_VALUE; + CrawlDatum value = new CrawlDatum(); + MapFile.Reader[] mreaders = MapFileOutputFormat.getReaders(fs, fetchDir, + getConf()); + for (int i = 0; i < mreaders.length; i++) { + while (mreaders[i].next(key, value)) { + cnt++; + if (value.getFetchTime() < start) + start = value.getFetchTime(); + if (value.getFetchTime() > end) + end = value.getFetchTime(); + } + mreaders[i].close(); + } + stats.start = start; + stats.end = end; + stats.fetched = cnt; + } + } + + if (pd) { + Path parseDir = new Path(segment, ParseData.DIR_NAME); + if (fs.exists(parseDir) && fs.getFileStatus(parseDir).isDirectory()) { + cnt = 0L; + long errors = 0L; + ParseData value = new ParseData(); + MapFile.Reader[] mreaders = MapFileOutputFormat.getReaders(fs, parseDir, + getConf()); + for (int i = 0; i < mreaders.length; i++) { + while (mreaders[i].next(key, value)) { + cnt++; + if (!value.getStatus().isSuccess()) + errors++; + } + mreaders[i].close(); + } + stats.parsed = cnt; + stats.parseErrors = errors; + } + } + } + + private static final int MODE_DUMP = 0; + + private static final int MODE_LIST = 1; + + private static final int MODE_GET = 2; + + public static void main(String[] args) throws Exception { + if (args.length < 2) { + usage(); + return; + } + int mode = -1; + if (args[0].equals("-dump")) + mode = MODE_DUMP; + else if (args[0].equals("-list")) + mode = MODE_LIST; + else if (args[0].equals("-get")) + mode = MODE_GET; + + boolean co = true; + boolean fe = true; + boolean ge = true; + boolean pa = true; + boolean pd = true; + boolean pt = true; + // collect general options + for (int i = 1; i < args.length; i++) { + if (args[i].equals("-nocontent")) { + co = false; + args[i] = null; + } else if (args[i].equals("-nofetch")) { + fe = false; + args[i] = null; + } else if (args[i].equals("-nogenerate")) { + ge = false; + args[i] = null; + } else if (args[i].equals("-noparse")) { + pa = false; + args[i] = null; + } else if (args[i].equals("-noparsedata")) { + pd = false; + args[i] = null; + } else if (args[i].equals("-noparsetext")) { + pt = false; + args[i] = null; + } + } + Configuration conf = NutchConfiguration.create(); + final FileSystem fs = FileSystem.get(conf); + SegmentReader segmentReader = new SegmentReader(conf, co, fe, ge, pa, pd, + pt); + // collect required args + switch (mode) { + case MODE_DUMP: + String input = args[1]; + if (input == null) { + System.err.println("Missing required argument: <segment_dir>"); + usage(); + return; + } + String output = args.length > 2 ? args[2] : null; + if (output == null) { + System.err.println("Missing required argument: <output>"); + usage(); + return; + } + segmentReader.dump(new Path(input), new Path(output)); + return; + case MODE_LIST: + ArrayList<Path> dirs = new ArrayList<Path>(); + for (int i = 1; i < args.length; i++) { + if (args[i] == null) + continue; + if (args[i].equals("-dir")) { + Path dir = new Path(args[++i]); + FileStatus[] fstats = fs.listStatus(dir, + HadoopFSUtil.getPassDirectoriesFilter(fs)); + Path[] files = HadoopFSUtil.getPaths(fstats); + if (files != null && files.length > 0) { + dirs.addAll(Arrays.asList(files)); + } + } else + dirs.add(new Path(args[i])); + } + segmentReader.list(dirs, new OutputStreamWriter(System.out, "UTF-8")); + return; + case MODE_GET: + input = args[1]; + if (input == null) { + System.err.println("Missing required argument: <segment_dir>"); + usage(); + return; + } + String key = args.length > 2 ? args[2] : null; + if (key == null) { + System.err.println("Missing required argument: <keyValue>"); + usage(); + return; + } + segmentReader.get(new Path(input), new Text(key), new OutputStreamWriter( + System.out, "UTF-8"), new HashMap<String, List<Writable>>()); + return; + default: + System.err.println("Invalid operation: " + args[0]); + usage(); + return; + } + } + + private static void usage() { + System.err + .println("Usage: SegmentReader (-dump ... | -list ... | -get ...) [general options]\n"); + System.err.println("* General options:"); + System.err.println("\t-nocontent\tignore content directory"); + System.err.println("\t-nofetch\tignore crawl_fetch directory"); + System.err.println("\t-nogenerate\tignore crawl_generate directory"); + System.err.println("\t-noparse\tignore crawl_parse directory"); + System.err.println("\t-noparsedata\tignore parse_data directory"); + System.err.println("\t-noparsetext\tignore parse_text directory"); + System.err.println(); + System.err + .println("* SegmentReader -dump <segment_dir> <output> [general options]"); + System.err + .println(" Dumps content of a <segment_dir> as a text file to <output>.\n"); + System.err.println("\t<segment_dir>\tname of the segment directory."); + System.err + .println("\t<output>\tname of the (non-existent) output directory."); + System.err.println(); + System.err + .println("* SegmentReader -list (<segment_dir1> ... | -dir <segments>) [general options]"); + System.err + .println(" List a synopsis of segments in specified directories, or all segments in"); + System.err + .println(" a directory <segments>, and print it on System.out\n"); + System.err + .println("\t<segment_dir1> ...\tlist of segment directories to process"); + System.err + .println("\t-dir <segments>\t\tdirectory that contains multiple segments"); + System.err.println(); + System.err + .println("* SegmentReader -get <segment_dir> <keyValue> [general options]"); + System.err + .println(" Get a specified record from a segment, and print it on System.out.\n"); + System.err.println("\t<segment_dir>\tname of the segment directory."); + System.err.println("\t<keyValue>\tvalue of the key (url)."); + System.err + .println("\t\tNote: put double-quotes around strings with spaces."); + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/segment/package-info.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/segment/package-info.java b/nutch-core/src/main/java/org/apache/nutch/segment/package-info.java new file mode 100644 index 0000000..ecc0c26 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/segment/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * A segment stores all data from on generate/fetch/update cycle: + * fetch list, protocol status, raw content, parsed content, and extracted outgoing links. + */ +package org.apache.nutch.segment; + http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/service/ConfManager.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/service/ConfManager.java b/nutch-core/src/main/java/org/apache/nutch/service/ConfManager.java new file mode 100644 index 0000000..c71cfa9 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/service/ConfManager.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.service; + +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.nutch.service.model.request.NutchConfig; + +public interface ConfManager { + + public Configuration get(String confId); + + public Map<String, String> getAsMap(String confId); + + public void setProperty(String confId, String propName, String propValue); + + public Set<String> list(); + + public String create(NutchConfig nutchConfig); + + public void delete(String confId); +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/service/JobManager.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/service/JobManager.java b/nutch-core/src/main/java/org/apache/nutch/service/JobManager.java new file mode 100644 index 0000000..20346fc --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/service/JobManager.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.service; + +import java.util.Collection; +import org.apache.nutch.service.model.request.JobConfig; +import org.apache.nutch.service.model.response.JobInfo; +import org.apache.nutch.service.model.response.JobInfo.State; + +public interface JobManager { + + public static enum JobType{ + INJECT, GENERATE, FETCH, PARSE, UPDATEDB, INDEX, READDB, CLASS, INVERTLINKS, DEDUP + }; + public Collection<JobInfo> list(String crawlId, State state); + + public JobInfo get(String crawlId, String id); + + /** + * Creates specified job + * @param jobConfig + * @return JobInfo + */ + public JobInfo create(JobConfig jobConfig); + + public boolean abort(String crawlId, String id); + + public boolean stop(String crawlId, String id); +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/service/NutchReader.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/service/NutchReader.java b/nutch-core/src/main/java/org/apache/nutch/service/NutchReader.java new file mode 100644 index 0000000..00bb78f --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/service/NutchReader.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.service; + +import java.io.FileNotFoundException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.nutch.service.impl.SequenceReader; +import org.apache.nutch.util.NutchConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public interface NutchReader { + + public static final Logger LOG = LoggerFactory.getLogger(NutchReader.class); + public static final Configuration conf = NutchConfiguration.create(); + + public List read(String path) throws FileNotFoundException; + public List head(String path, int nrows) throws FileNotFoundException; + public List slice(String path, int start, int end) throws FileNotFoundException; + public int count(String path) throws FileNotFoundException; +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/service/NutchServer.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/service/NutchServer.java b/nutch-core/src/main/java/org/apache/nutch/service/NutchServer.java new file mode 100644 index 0000000..e206707 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/service/NutchServer.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.service; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider; + +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; +import org.apache.commons.cli.CommandLine; +import org.apache.cxf.binding.BindingFactoryManager; +import org.apache.cxf.jaxrs.JAXRSBindingFactory; +import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; +import org.apache.cxf.jaxrs.lifecycle.ResourceProvider; +import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider; +import org.apache.nutch.fetcher.FetchNodeDb; +import org.apache.nutch.service.impl.ConfManagerImpl; +import org.apache.nutch.service.impl.JobFactory; +import org.apache.nutch.service.impl.JobManagerImpl; +import org.apache.nutch.service.impl.NutchServerPoolExecutor; +import org.apache.nutch.service.model.response.JobInfo; +import org.apache.nutch.service.model.response.JobInfo.State; +import org.apache.nutch.service.resources.AdminResource; +import org.apache.nutch.service.resources.ConfigResource; +import org.apache.nutch.service.resources.DbResource; +import org.apache.nutch.service.resources.JobResource; +import org.apache.nutch.service.resources.ReaderResouce; +import org.apache.nutch.service.resources.SeedResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Queues; + +public class NutchServer { + + private static final Logger LOG = LoggerFactory.getLogger(NutchServer.class); + + private static final String LOCALHOST = "localhost"; + private static final Integer DEFAULT_PORT = 8081; + private static final int JOB_CAPACITY = 100; + + private static Integer port = DEFAULT_PORT; + private static String host = LOCALHOST; + + private static final String CMD_HELP = "help"; + private static final String CMD_PORT = "port"; + private static final String CMD_HOST = "host"; + + private long started; + private boolean running; + private ConfManager configManager; + private JobManager jobManager; + private JAXRSServerFactoryBean sf; + + private static FetchNodeDb fetchNodeDb; + + private static NutchServer server; + + static { + server = new NutchServer(); + } + + private NutchServer() { + configManager = new ConfManagerImpl(); + BlockingQueue<Runnable> runnables = Queues.newArrayBlockingQueue(JOB_CAPACITY); + NutchServerPoolExecutor executor = new NutchServerPoolExecutor(10, JOB_CAPACITY, 1, TimeUnit.HOURS, runnables); + jobManager = new JobManagerImpl(new JobFactory(), configManager, executor); + fetchNodeDb = FetchNodeDb.getInstance(); + + sf = new JAXRSServerFactoryBean(); + BindingFactoryManager manager = sf.getBus().getExtension(BindingFactoryManager.class); + JAXRSBindingFactory factory = new JAXRSBindingFactory(); + factory.setBus(sf.getBus()); + manager.registerBindingFactory(JAXRSBindingFactory.JAXRS_BINDING_ID, factory); + sf.setResourceClasses(getClasses()); + sf.setResourceProviders(getResourceProviders()); + sf.setProvider(new JacksonJaxbJsonProvider()); + + } + + public static NutchServer getInstance() { + return server; + } + + protected static void startServer() { + server.start(); + } + + private void start() { + LOG.info("Starting NutchServer on {}:{} ...", host, port); + try{ + String address = "http://" + host + ":" + port; + sf.setAddress(address); + sf.create(); + }catch(Exception e){ + throw new IllegalStateException("Server could not be started", e); + } + + started = System.currentTimeMillis(); + running = true; + LOG.info("Started Nutch Server on {}:{} at {}", new Object[] {host, port, started}); + } + + private List<Class<?>> getClasses() { + List<Class<?>> resources = new ArrayList<Class<?>>(); + resources.add(JobResource.class); + resources.add(ConfigResource.class); + resources.add(DbResource.class); + resources.add(AdminResource.class); + resources.add(SeedResource.class); + resources.add(ReaderResouce.class); + return resources; + } + + private List<ResourceProvider> getResourceProviders() { + List<ResourceProvider> resourceProviders = new ArrayList<ResourceProvider>(); + resourceProviders.add(new SingletonResourceProvider(getConfManager())); + return resourceProviders; + } + + public ConfManager getConfManager() { + return configManager; + } + + public JobManager getJobManager() { + return jobManager; + } + + public FetchNodeDb getFetchNodeDb(){ + return fetchNodeDb; + } + + public boolean isRunning(){ + return running; + } + + public long getStarted(){ + return started; + } + + public static void main(String[] args) throws ParseException { + CommandLineParser parser = new PosixParser(); + Options options = createOptions(); + CommandLine commandLine = parser.parse(options, args); + if (commandLine.hasOption(CMD_HELP)) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("NutchServer", options, true); + return; + } + + if (commandLine.hasOption(CMD_PORT)) { + port = Integer.parseInt(commandLine.getOptionValue(CMD_PORT)); + } + + if (commandLine.hasOption(CMD_HOST)) { + host = commandLine.getOptionValue(CMD_HOST); + } + + startServer(); + } + + private static Options createOptions() { + Options options = new Options(); + + OptionBuilder.withDescription("Show this help"); + options.addOption(OptionBuilder.create(CMD_HELP)); + + OptionBuilder.withArgName("port"); + OptionBuilder.hasOptionalArg(); + OptionBuilder.withDescription("The port to run the Nutch Server. Default port 8081"); + options.addOption(OptionBuilder.create(CMD_PORT)); + + OptionBuilder.withArgName("host"); + OptionBuilder.hasOptionalArg(); + OptionBuilder.withDescription("The host to bind the Nutch Server to. Default is localhost."); + options.addOption(OptionBuilder.create(CMD_HOST)); + + return options; + } + + public boolean canStop(boolean force){ + if(force) + return true; + + Collection<JobInfo> jobs = getJobManager().list(null, State.RUNNING); + return jobs.isEmpty(); + } + + protected static void setPort(int port) { + NutchServer.port = port; + } + + public int getPort() { + return port; + } + + public void stop() { + System.exit(0); + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/service/impl/ConfManagerImpl.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/service/impl/ConfManagerImpl.java b/nutch-core/src/main/java/org/apache/nutch/service/impl/ConfManagerImpl.java new file mode 100644 index 0000000..0c08ce4 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/service/impl/ConfManagerImpl.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.service.impl; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.nutch.service.ConfManager; +import org.apache.nutch.service.model.request.NutchConfig; +import org.apache.nutch.service.resources.ConfigResource; +import org.apache.nutch.util.NutchConfiguration; + +import com.google.common.collect.Maps; + +public class ConfManagerImpl implements ConfManager { + + + private Map<String, Configuration> configurations = Maps.newConcurrentMap(); + + private AtomicInteger newConfigId = new AtomicInteger(); + + public ConfManagerImpl() { + configurations.put(ConfigResource.DEFAULT, NutchConfiguration.create()); + } + + /** + * Returns the configuration associatedConfManagerImpl with the given confId + */ + public Configuration get(String confId) { + if (confId == null) { + return configurations.get(ConfigResource.DEFAULT); + } + return configurations.get(confId); + } + + public Map<String, String> getAsMap(String confId) { + Configuration configuration = configurations.get(confId); + if (configuration == null) { + return Collections.emptyMap(); + } + + Iterator<Entry<String, String>> iterator = configuration.iterator(); + Map<String, String> configMap = Maps.newTreeMap(); + while (iterator.hasNext()) { + Entry<String, String> entry = iterator.next(); + configMap.put(entry.getKey(), entry.getValue()); + } + return configMap; + } + + /** + * Sets the given property in the configuration associated with the confId + */ + public void setProperty(String confId, String propName, String propValue) { + if (!configurations.containsKey(confId)) { + throw new IllegalArgumentException("Unknown configId '" + confId + "'"); + } + Configuration conf = configurations.get(confId); + conf.set(propName, propValue); + } + + public Set<String> list() { + return configurations.keySet(); + } + + /** + * Created a new configuration based on the values provided. + * @param NutchConfig + * @return String - confId + */ + public String create(NutchConfig nutchConfig) { + if (StringUtils.isBlank(nutchConfig.getConfigId())) { + nutchConfig.setConfigId(String.valueOf(newConfigId.incrementAndGet())); + } + + if (!canCreate(nutchConfig)) { + throw new IllegalArgumentException("Config already exists."); + } + + createHadoopConfig(nutchConfig); + return nutchConfig.getConfigId(); + } + + + public void delete(String confId) { + configurations.remove(confId); + } + + private boolean canCreate(NutchConfig nutchConfig) { + if (nutchConfig.isForce()) { + return true; + } + if (!configurations.containsKey(nutchConfig.getConfigId())) { + return true; + } + return false; + } + + private void createHadoopConfig(NutchConfig nutchConfig) { + Configuration conf = NutchConfiguration.create(); + configurations.put(nutchConfig.getConfigId(), conf); + + if (MapUtils.isEmpty(nutchConfig.getParams())) { + return; + } + for (Entry<String, String> e : nutchConfig.getParams().entrySet()) { + conf.set(e.getKey(), e.getValue()); + } + } + +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/service/impl/JobFactory.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/service/impl/JobFactory.java b/nutch-core/src/main/java/org/apache/nutch/service/impl/JobFactory.java new file mode 100644 index 0000000..a74e362 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/service/impl/JobFactory.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.service.impl; + +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.nutch.service.JobManager.JobType; +import org.apache.nutch.crawl.CrawlDb; +import org.apache.nutch.crawl.DeduplicationJob; +import org.apache.nutch.crawl.Generator; +import org.apache.nutch.crawl.Injector; +import org.apache.nutch.crawl.LinkDb; +import org.apache.nutch.fetcher.Fetcher; +import org.apache.nutch.indexer.IndexingJob; +import org.apache.nutch.parse.ParseSegment; +import org.apache.nutch.util.NutchTool; + +import com.google.common.collect.Maps; + +public class JobFactory { + private static Map<JobType, Class<? extends NutchTool>> typeToClass; + + static { + typeToClass = Maps.newHashMap(); + typeToClass.put(JobType.INJECT, Injector.class); + typeToClass.put(JobType.GENERATE, Generator.class); + typeToClass.put(JobType.FETCH, Fetcher.class); + typeToClass.put(JobType.PARSE, ParseSegment.class); + typeToClass.put(JobType.INDEX, IndexingJob.class); + typeToClass.put(JobType.UPDATEDB, CrawlDb.class); + typeToClass.put(JobType.INVERTLINKS, LinkDb.class); + typeToClass.put(JobType.DEDUP, DeduplicationJob.class); + } + + public NutchTool createToolByType(JobType type, Configuration conf) { + if (!typeToClass.containsKey(type)) { + return null; + } + Class<? extends NutchTool> clz = typeToClass.get(type); + return createTool(clz, conf); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public NutchTool createToolByClassName(String className, Configuration conf) { + try { + Class clz = Class.forName(className); + return createTool(clz, conf); + } catch (ClassNotFoundException e) { + throw new IllegalStateException(e); + } + } + + private NutchTool createTool(Class<? extends NutchTool> clz, + Configuration conf) { + return ReflectionUtils.newInstance(clz, conf); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/service/impl/JobManagerImpl.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/service/impl/JobManagerImpl.java b/nutch-core/src/main/java/org/apache/nutch/service/impl/JobManagerImpl.java new file mode 100644 index 0000000..a915457 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/service/impl/JobManagerImpl.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.service.impl; + +import java.util.Collection; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.nutch.service.ConfManager; +import org.apache.nutch.service.JobManager; +import org.apache.nutch.service.model.request.JobConfig; +import org.apache.nutch.service.model.response.JobInfo; +import org.apache.nutch.service.model.response.JobInfo.State; +import org.apache.nutch.util.NutchTool; + +public class JobManagerImpl implements JobManager { + + private JobFactory jobFactory; + private NutchServerPoolExecutor executor; + private ConfManager configManager; + + public JobManagerImpl(JobFactory jobFactory, ConfManager configManager, NutchServerPoolExecutor executor) { + this.jobFactory = jobFactory; + this.configManager = configManager; + this.executor = executor; + } + + @Override + public JobInfo create(JobConfig jobConfig) { + if (jobConfig.getArgs() == null) { + throw new IllegalArgumentException("Arguments cannot be null!"); + } + Configuration conf = cloneConfiguration(jobConfig.getConfId()); + NutchTool tool = createTool(jobConfig, conf); + JobWorker worker = new JobWorker(jobConfig, conf, tool); + executor.execute(worker); + executor.purge(); + return worker.getInfo(); + } + + private Configuration cloneConfiguration(String confId) { + Configuration conf = configManager.get(confId); + if (conf == null) { + throw new IllegalArgumentException("Unknown confId " + confId); + } + return new Configuration(conf); + } + + @Override + public Collection<JobInfo> list(String crawlId, State state) { + if (state == null || state == State.ANY) { + return executor.getAllJobs(); + } + if (state == State.RUNNING || state == State.IDLE) { + return executor.getJobRunning(); + } + return executor.getJobHistory(); + } + + @Override + public JobInfo get(String crawlId, String jobId) { + return executor.getInfo(jobId); + } + + @Override + public boolean abort(String crawlId, String id) { + return executor.findWorker(id).killJob(); + } + + @Override + public boolean stop(String crawlId, String id) { + return executor.findWorker(id).stopJob(); + } + + private NutchTool createTool(JobConfig jobConfig, Configuration conf){ + if(StringUtils.isNotBlank(jobConfig.getJobClassName())){ + return jobFactory.createToolByClassName(jobConfig.getJobClassName(), conf); + } + return jobFactory.createToolByType(jobConfig.getType(), conf); + } +}
