This is an automated email from the ASF dual-hosted git repository. snagel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nutch.git
commit d758a31bbee0807bcbc92a591668076cfa95aeb1 Author: Sebastian Nagel <[email protected]> AuthorDate: Fri Dec 8 22:41:05 2017 +0100 NUTCH-2474 CrawlDbReader -stats fails with ClassCastException - replace CrawlDbStatCombiner by CrawlDbStatReducer and ensure that data is properly processed independently whether and how often combiner is called - simplify calculation of minimum and maximum --- src/java/org/apache/nutch/crawl/CrawlDbReader.java | 273 +++++++++------------ 1 file changed, 114 insertions(+), 159 deletions(-) diff --git a/src/java/org/apache/nutch/crawl/CrawlDbReader.java b/src/java/org/apache/nutch/crawl/CrawlDbReader.java index 42b5a3b..117aa7f 100644 --- a/src/java/org/apache/nutch/crawl/CrawlDbReader.java +++ b/src/java/org/apache/nutch/crawl/CrawlDbReader.java @@ -202,14 +202,24 @@ public class CrawlDbReader extends Configured implements Closeable, Tool { output.collect(new Text("status " + value.getStatus()), COUNT_1); output.collect(new Text("retry " + value.getRetriesSinceFetch()), COUNT_1); - output.collect(new Text("sc"), new NutchWritable( - new FloatWritable(value.getScore()))); + + NutchWritable score = new NutchWritable( + new FloatWritable(value.getScore())); + output.collect(new Text("sc"), score); + output.collect(new Text("sct"), score); + output.collect(new Text("scd"), score); + // fetch time (in minutes to prevent from overflows when summing up) - output.collect(new Text("ft"), new NutchWritable( - new LongWritable(value.getFetchTime() / (1000 * 60)))); + NutchWritable fetchTime = new NutchWritable( + new LongWritable(value.getFetchTime() / (1000 * 60))); + output.collect(new Text("ft"), fetchTime); + output.collect(new Text("ftt"), fetchTime); + // fetch interval (in seconds) - output.collect(new Text("fi"), - new NutchWritable(new LongWritable(value.getFetchInterval()))); + NutchWritable fetchInterval = new NutchWritable(new LongWritable(value.getFetchInterval())); + output.collect(new Text("fi"), fetchInterval); + output.collect(new Text("fit"), fetchInterval); + if (sort) { URL u = new URL(key.toString()); String host = u.getHost(); @@ -219,88 +229,6 @@ public class CrawlDbReader extends Configured implements Closeable, Tool { } } - public static class CrawlDbStatCombiner implements - Reducer<Text, NutchWritable, Text, NutchWritable> { - LongWritable val = new LongWritable(); - - public CrawlDbStatCombiner() { - } - - public void configure(JobConf job) { - } - - public void close() { - } - - private void reduceMinMaxTotal(String keyPrefix, Iterator<NutchWritable> values, - OutputCollector<Text, NutchWritable> output, Reporter reporter) - throws IOException { - long total = 0; - long min = Long.MAX_VALUE; - long max = Long.MIN_VALUE; - while (values.hasNext()) { - long cnt = ((LongWritable) values.next().get()).get(); - if (cnt < min) - min = cnt; - if (cnt > max) - max = cnt; - total += cnt; - } - output.collect(new Text(keyPrefix + "n"), - new NutchWritable(new LongWritable(min))); - output.collect(new Text(keyPrefix + "x"), - new NutchWritable(new LongWritable(max))); - output.collect(new Text(keyPrefix + "t"), - new NutchWritable(new LongWritable(total))); - } - - private void reduceMinMaxTotalFloat(String keyPrefix, Iterator<NutchWritable> values, - OutputCollector<Text, NutchWritable> output, Reporter reporter) - throws IOException { - double total = 0; - float min = Float.MAX_VALUE; - float max = Float.MIN_VALUE; - TDigest tdigest = TDigest.createMergingDigest(100.0); - while (values.hasNext()) { - float val = ((FloatWritable) values.next().get()).get(); - tdigest.add(val); - if (val < min) - min = val; - if (val > max) - max = val; - total += val; - } - output.collect(new Text(keyPrefix + "n"), - new NutchWritable(new FloatWritable(min))); - output.collect(new Text(keyPrefix + "x"), - new NutchWritable(new FloatWritable(max))); - output.collect(new Text(keyPrefix + "t"), - new NutchWritable(new FloatWritable((float) total))); - ByteBuffer tdigestBytes = ByteBuffer.allocate(tdigest.smallByteSize()); - tdigest.asSmallBytes(tdigestBytes); - output.collect(new Text(keyPrefix + "d"), - new NutchWritable(new BytesWritable(tdigestBytes.array()))); - } - - public void reduce(Text key, Iterator<NutchWritable> values, - OutputCollector<Text, NutchWritable> output, Reporter reporter) - throws IOException { - val.set(0L); - String k = key.toString(); - if (k.equals("sc")) { - reduceMinMaxTotalFloat(k, values, output, reporter); - } else if (k.equals("ft") || k.equals("fi")) { - reduceMinMaxTotal(k, values, output, reporter); - } else { - while (values.hasNext()) { - LongWritable cnt = (LongWritable) values.next().get(); - val.set(val.get() + cnt.get()); - } - output.collect(key, new NutchWritable(val)); - } - } - } - public static class CrawlDbStatReducer implements Reducer<Text, NutchWritable, Text, NutchWritable> { public void configure(JobConf job) { @@ -314,7 +242,8 @@ public class CrawlDbReader extends Configured implements Closeable, Tool { throws IOException { String k = key.toString(); - if (k.equals("T")) { + if (k.equals("T") || k.startsWith("status") || k.startsWith("retry") + || k.equals("ftt") || k.equals("fit")) { // sum all values for this key long sum = 0; while (values.hasNext()) { @@ -323,68 +252,59 @@ public class CrawlDbReader extends Configured implements Closeable, Tool { } // output sum output.collect(key, new NutchWritable(new LongWritable(sum))); - } else if (k.startsWith("status") || k.startsWith("retry")) { - LongWritable cnt = new LongWritable(); - while (values.hasNext()) { - LongWritable val = (LongWritable) values.next().get(); - cnt.set(cnt.get() + val.get()); - } - output.collect(key, new NutchWritable(cnt)); - } else if (k.equals("scx")) { - FloatWritable max = new FloatWritable(Float.MIN_VALUE); - while (values.hasNext()) { - FloatWritable val = (FloatWritable) values.next().get(); - if (max.get() < val.get()) - max.set(val.get()); - } - output.collect(key, new NutchWritable(max)); - } else if (k.equals("ftx") || k.equals("fix")) { - LongWritable cnt = new LongWritable(Long.MIN_VALUE); - while (values.hasNext()) { - LongWritable val = (LongWritable) values.next().get(); - if (cnt.get() < val.get()) - cnt.set(val.get()); - } - output.collect(key, new NutchWritable(cnt)); - } else if (k.equals("scn")) { - FloatWritable min = new FloatWritable(Float.MAX_VALUE); + } else if (k.equals("sc")) { + float min = Float.MAX_VALUE; + float max = Float.MIN_VALUE; while (values.hasNext()) { - FloatWritable val = (FloatWritable) values.next().get(); - if (min.get() > val.get()) - min.set(val.get()); + float value = ((FloatWritable) values.next().get()).get(); + if (max < value) { + max = value; + } + if (min > value) { + min = value; + } } - output.collect(key, new NutchWritable(min)); - } else if (k.equals("ftn") || k.equals("fin")) { - LongWritable cnt = new LongWritable(Long.MAX_VALUE); + output.collect(key, new NutchWritable(new FloatWritable(min))); + output.collect(key, new NutchWritable(new FloatWritable(max))); + } else if (k.equals("ft") || k.equals("fi")) { + long min = Long.MAX_VALUE; + long max = Long.MIN_VALUE; while (values.hasNext()) { - LongWritable val = (LongWritable) values.next().get(); - if (cnt.get() > val.get()) - cnt.set(val.get()); + long value = ((LongWritable) values.next().get()).get(); + if (max < value) { + max = value; + } + if (min > value) { + min = value; + } } - output.collect(key, new NutchWritable(cnt)); + output.collect(key, new NutchWritable(new LongWritable(min))); + output.collect(key, new NutchWritable(new LongWritable(max))); } else if (k.equals("sct")) { - FloatWritable cnt = new FloatWritable(); - while (values.hasNext()) { - FloatWritable val = (FloatWritable) values.next().get(); - cnt.set(cnt.get() + val.get()); - } - output.collect(key, new NutchWritable(cnt)); - } else if (k.equals("ftt") || k.equals("fit")) { - LongWritable cnt = new LongWritable(); + float cnt = 0.0f; while (values.hasNext()) { - LongWritable val = (LongWritable) values.next().get(); - cnt.set(cnt.get() + val.get()); + float value = ((FloatWritable) values.next().get()).get(); + cnt += value; } - output.collect(key, new NutchWritable(cnt)); + output.collect(key, new NutchWritable(new FloatWritable(cnt))); } else if (k.equals("scd") || k.equals("ftd") || k.equals("fid")) { MergingDigest tdigest = null; while (values.hasNext()) { - byte[] bytes = ((BytesWritable) values.next().get()).getBytes(); - MergingDigest tdig = MergingDigest.fromBytes(ByteBuffer.wrap(bytes)); - if (tdigest == null) { - tdigest = tdig; - } else { - tdigest.add(tdig); + Writable value = values.next().get(); + if (value instanceof BytesWritable) { + byte[] bytes = ((BytesWritable) value).getBytes(); + MergingDigest tdig = MergingDigest + .fromBytes(ByteBuffer.wrap(bytes)); + if (tdigest == null) { + tdigest = tdig; + } else { + tdigest.add(tdig); + } + } else if (value instanceof FloatWritable) { + if (tdigest == null) { + tdigest = (MergingDigest) TDigest.createMergingDigest(100.0); + } + tdigest.add(((FloatWritable) value).get()); } } ByteBuffer tdigestBytes = ByteBuffer.allocate(tdigest.smallByteSize()); @@ -455,7 +375,7 @@ public class CrawlDbReader extends Configured implements Closeable, Tool { job.setInputFormat(SequenceFileInputFormat.class); job.setMapperClass(CrawlDbStatMapper.class); - job.setCombinerClass(CrawlDbStatCombiner.class); + job.setCombinerClass(CrawlDbStatReducer.class); job.setReducerClass(CrawlDbStatReducer.class); FileOutputFormat.setOutputPath(job, tmpFolder); @@ -486,27 +406,57 @@ public class CrawlDbReader extends Configured implements Closeable, Tool { stats.put(k, value.get()); continue; } - if (k.equals("scx")) { - FloatWritable fvalue = (FloatWritable) value.get(); - if (((FloatWritable) val).get() < fvalue.get()) - ((FloatWritable) val).set(fvalue.get()); - } else if (k.equals("ftx") || k.equals("fix")) { - LongWritable lvalue = (LongWritable) value.get(); - if (((LongWritable) val).get() < lvalue.get()) - ((LongWritable) val).set(lvalue.get()); - } else if (k.equals("scn")) { - FloatWritable fvalue = (FloatWritable) value.get(); - if (((FloatWritable) val).get() > fvalue.get()) - ((FloatWritable) val).set(fvalue.get()); - } else if (k.equals("ftn") || k.equals("fin")) { - LongWritable lvalue = (LongWritable) value.get(); - if (((LongWritable) val).get() > lvalue.get()) - ((LongWritable) val).set(lvalue.get()); + if (k.equals("sc")) { + float min = Float.MAX_VALUE; + float max = Float.MIN_VALUE; + if (stats.containsKey("scn")) { + min = ((FloatWritable) stats.get("scn")).get(); + } else { + min = ((FloatWritable) stats.get("sc")).get(); + } + if (stats.containsKey("scx")) { + max = ((FloatWritable) stats.get("scx")).get(); + } else { + max = ((FloatWritable) stats.get("sc")).get(); + } + float fvalue = ((FloatWritable) value.get()).get(); + if (min > fvalue) { + min = fvalue; + } + if (max < fvalue) { + max = fvalue; + } + stats.put("scn", new FloatWritable(min)); + stats.put("scx", new FloatWritable(max)); + } else if (k.equals("ft") || k.equals("fi")) { + long min = Long.MAX_VALUE; + long max = Long.MIN_VALUE; + String minKey = k + "n"; + String maxKey = k + "x"; + if (stats.containsKey(minKey)) { + min = ((LongWritable) stats.get(minKey)).get(); + } else if (stats.containsKey(k)) { + min = ((LongWritable) stats.get(k)).get(); + } + if (stats.containsKey(maxKey)) { + max = ((LongWritable) stats.get(maxKey)).get(); + } else if (stats.containsKey(k)) { + max = ((LongWritable) stats.get(k)).get(); + } + long lvalue = ((LongWritable) value.get()).get(); + if (min > lvalue) { + min = lvalue; + } + if (max < lvalue) { + max = lvalue; + } + stats.put(k + "n", new LongWritable(min)); + stats.put(k + "x", new LongWritable(max)); } else if (k.equals("sct")) { FloatWritable fvalue = (FloatWritable) value.get(); ((FloatWritable) val) .set(((FloatWritable) val).get() + fvalue.get()); - } else if (k.equals("scd") || k.equals("ftd") || k.equals("fid")) { + } else if (k.equals("scd")) { MergingDigest tdigest = null; MergingDigest tdig = MergingDigest.fromBytes( ByteBuffer.wrap(((BytesWritable) value.get()).getBytes())); @@ -529,6 +479,11 @@ public class CrawlDbReader extends Configured implements Closeable, Tool { } reader.close(); } + // remove score, fetch interval, and fetch time + // (used for min/max calculation) + stats.remove("sc"); + stats.remove("fi"); + stats.remove("ft"); // removing the tmp folder fileSystem.delete(tmpFolder, true); return stats; -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
