Modified: nutch/trunk/src/java/org/apache/nutch/util/NutchTool.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/util/NutchTool.java?rev=1039014&r1=1039013&r2=1039014&view=diff ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/util/NutchTool.java (original) +++ nutch/trunk/src/java/org/apache/nutch/util/NutchTool.java Thu Nov 25 12:05:57 2010 @@ -1,20 +1,80 @@ package org.apache.nutch.util; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.conf.Configured; import org.apache.hadoop.mapreduce.Job; +import org.apache.nutch.metadata.Nutch; -public interface NutchTool { +public abstract class NutchTool extends Configured { - /** Prepares the tool. May return additional info, or null. */ - public Map<String,Object> prepare() throws Exception; - - /** Create jobs to be executed in sequence. */ - public Job[] createJobs(Object... args) throws Exception; + protected HashMap<String,Object> results = new HashMap<String,Object>(); + protected Map<String,Object> status = + Collections.synchronizedMap(new HashMap<String,Object>()); + protected Job currentJob; + protected int numJobs; + protected int currentJobNum; + + /** Runs the tool, using a map of arguments. + * May return results, or null. + */ + public abstract Map<String,Object> run(Map<String,Object> args) throws Exception; + + /** Returns relative progress of the tool, a float in range [0,1]. */ + public float getProgress() { + float res = 0; + if (currentJob != null) { + try { + res = (currentJob.mapProgress() + currentJob.reduceProgress()) / 2.0f; + } catch (IOException e) { + e.printStackTrace(); + res = 0; + } catch (IllegalStateException ile) { + ile.printStackTrace(); + res = 0; + } + } + // take into account multiple jobs + if (numJobs > 1) { + res = (currentJobNum + res) / (float)numJobs; + } + status.put(Nutch.STAT_PROGRESS, res); + return res; + } + + + /** Returns current status of the running tool. */ + public Map<String,Object> getStatus() { + return status; + } - /** Post-process results of a job. */ - public Map<String,Object> postJob(int jobIndex, Job job) throws Exception; + /** Stop the job with the possibility to resume. Subclasses should + * override this, since by default it calls {...@link #killJob()}. + * @return true if succeeded, false otherwise + */ + public boolean stopJob() throws Exception { + return killJob(); + } - /** Finish processing and optionally return results. */ - public Map<String,Object> finish() throws Exception; + /** + * Kill the job immediately. Clients should assume that any results + * that the job produced so far are in inconsistent state or missing. + * @return true if succeeded, false otherwise. + * @throws Exception + */ + public boolean killJob() throws Exception { + if (currentJob != null && !currentJob.isComplete()) { + try { + currentJob.killJob(); + return true; + } catch (Exception e) { + e.printStackTrace(); + return false; + } + } + return false; + } } \ No newline at end of file
Added: nutch/trunk/src/java/org/apache/nutch/util/ToolUtil.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/util/ToolUtil.java?rev=1039014&view=auto ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/util/ToolUtil.java (added) +++ nutch/trunk/src/java/org/apache/nutch/util/ToolUtil.java Thu Nov 25 12:05:57 2010 @@ -0,0 +1,62 @@ +package org.apache.nutch.util; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.CounterGroup; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.nutch.metadata.Nutch; + +public class ToolUtil { + + public static final Map<String,Object> toArgMap(Object... args) { + if (args == null) { + return null; + } + if (args.length % 2 != 0) { + throw new RuntimeException("expected pairs of argName argValue"); + } + HashMap<String,Object> res = new HashMap<String,Object>(); + for (int i = 0; i < args.length; i += 2) { + if (args[i + 1] != null) { + res.put(String.valueOf(args[i]), args[i + 1]); + } + } + return res; + } + + public static final void recordJobStatus(String label, Job job, Map<String,Object> results) { + Map<String,Object> jobs = (Map<String,Object>)results.get(Nutch.STAT_JOBS); + if (jobs == null) { + jobs = new LinkedHashMap<String,Object>(); + results.put(Nutch.STAT_JOBS, jobs); + } + Map<String,Object> stats = new HashMap<String,Object>(); + Map<String,Object> countStats = new HashMap<String,Object>(); + try { + Counters counters = job.getCounters(); + for (CounterGroup cg : counters) { + Map<String,Object> cnts = new HashMap<String,Object>(); + countStats.put(cg.getDisplayName(), cnts); + for (Counter c : cg) { + cnts.put(c.getName(), c.getValue()); + } + } + } catch (Exception e) { + countStats.put("error", e.toString()); + } + stats.put(Nutch.STAT_COUNTERS, countStats); + stats.put("jobName", job.getJobName()); + stats.put("jobID", job.getJobID()); + if (label == null) { + label = job.getJobName(); + if (job.getJobID() != null) { + label = label + "-" + job.getJobID(); + } + } + jobs.put(label, stats); + } +} Propchange: nutch/trunk/src/java/org/apache/nutch/util/ToolUtil.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: nutch/trunk/src/test/org/apache/nutch/api/TestAPI.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/test/org/apache/nutch/api/TestAPI.java?rev=1039014&r1=1039013&r2=1039014&view=diff ============================================================================== --- nutch/trunk/src/test/org/apache/nutch/api/TestAPI.java (original) +++ nutch/trunk/src/test/org/apache/nutch/api/TestAPI.java Thu Nov 25 12:05:57 2010 @@ -1,28 +1,35 @@ package org.apache.nutch.api; +import static org.junit.Assert.*; + import java.util.HashMap; import java.util.Map; import org.apache.nutch.api.JobManager.JobType; +import org.apache.nutch.metadata.Nutch; +import org.apache.nutch.util.NutchTool; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; import org.restlet.ext.jackson.JacksonRepresentation; import org.restlet.representation.Representation; import org.restlet.resource.ClientResource; -import junit.framework.TestCase; - -public class TestAPI extends TestCase { +public class TestAPI { - NutchServer server; + private static NutchServer server; ClientResource cli; - String baseUrl = "http://localhost:8192/nutch/"; + private static String baseUrl = "http://localhost:8192/nutch/"; - public void setUp() throws Exception { + @BeforeClass + public static void before() throws Exception { server = new NutchServer(8192); server.start(); } - public void tearDown() throws Exception { + @AfterClass + public static void after() throws Exception { if (!server.stop(false)) { for (int i = 1; i < 11; i++) { System.err.println("Waiting for jobs to complete - " + i + "s"); @@ -41,13 +48,16 @@ public class TestAPI extends TestCase { } } + @Test public void testInfoAPI() throws Exception { ClientResource cli = new ClientResource(baseUrl); - String expected = "[[\"confs\",\"Configuration manager\"],[\"jobs\",\"Job manager\"]]"; + String expected = "[[\"admin\",\"Service admin actions\"],[\"confs\",\"Configuration manager\"],[\"db\",\"DB data streaming\"],[\"jobs\",\"Job manager\"]]"; String got = cli.get().getText(); assertEquals(expected, got); } + @SuppressWarnings("rawtypes") + @Test public void testConfsAPI() throws Exception { ClientResource cli = new ClientResource(baseUrl + ConfResource.PATH); assertEquals("[\"default\"]", cli.get().getText()); @@ -70,6 +80,8 @@ public class TestAPI extends TestCase { assertEquals("[\"default\"]", cli.get().getText()); } + @SuppressWarnings("rawtypes") + @Test public void testJobsAPI() throws Exception { ClientResource cli = new ClientResource(baseUrl + JobResource.PATH); assertEquals("[]", cli.get().getText()); @@ -77,8 +89,6 @@ public class TestAPI extends TestCase { Map<String,Object> map = new HashMap<String,Object>(); map.put(Params.JOB_TYPE, JobType.READDB.toString()); map.put(Params.CONF_ID, "default"); - JacksonRepresentation<Map<String,Object>> jr = - new JacksonRepresentation<Map<String,Object>>(map); Representation r = cli.put(map); String jobId = r.getText(); assertNotNull(jobId); @@ -90,5 +100,101 @@ public class TestAPI extends TestCase { String state = (String)list[0].get("state"); assertEquals(jobId, id); assertEquals(state, "RUNNING"); + int cnt = 10; + do { + try { + Thread.sleep(2000); + } catch (Exception e) {}; + list = cli.get(Map[].class); + state = (String)list[0].get("state"); + if (!state.equals("RUNNING")) { + break; + } + } while (--cnt > 0); + assertTrue(cnt > 0); + if (list == null) return; + for (Map m : list) { + System.out.println(m); + } + } + + @SuppressWarnings("unchecked") + @Test + public void testStopKill() throws Exception { + ClientResource cli = new ClientResource(baseUrl + JobResource.PATH); + // create + Map<String,Object> map = new HashMap<String,Object>(); + map.put(Params.JOB_TYPE, JobType.CLASS.toString()); + Map<String,Object> args = new HashMap<String,Object>(); + map.put(Params.ARGS, args); + args.put(Nutch.ARG_CLASS, SpinningJob.class.getName()); + map.put(Params.CONF_ID, "default"); + Representation r = cli.put(map); + String jobId = r.getText(); + cli.release(); + assertNotNull(jobId); + System.out.println(jobId); + assertTrue(jobId.startsWith("default-CLASS-")); + ClientResource stopCli = new ClientResource(baseUrl + JobResource.PATH + + "?job=" + jobId + "&cmd=stop"); + r = stopCli.get(); + assertEquals("true", r.getText()); + stopCli.release(); + Thread.sleep(2000); // wait for the job to finish + ClientResource jobCli = new ClientResource(baseUrl + JobResource.PATH + "/" + jobId); + Map<String,Object> res = jobCli.get(Map.class); + res = (Map<String,Object>)res.get("result"); + assertEquals("stopped", res.get("res")); + jobCli.release(); + // restart and kill + r = cli.put(map); + jobId = r.getText(); + cli.release(); + assertNotNull(jobId); + System.out.println(jobId); + assertTrue(jobId.startsWith("default-CLASS-")); + ClientResource killCli = new ClientResource(baseUrl + JobResource.PATH + + "?job=" + jobId + "&cmd=abort"); + r = killCli.get(); + assertEquals("true", r.getText()); + killCli.release(); + Thread.sleep(2000); // wait for the job to finish + jobCli = new ClientResource(baseUrl + JobResource.PATH + "/" + jobId); + res = jobCli.get(Map.class); + res = (Map<String,Object>)res.get("result"); + assertEquals("killed", res.get("res")); + jobCli.release(); + } + + public static class SpinningJob extends NutchTool { + volatile boolean shouldStop = false; + + @Override + public Map<String, Object> run(Map<String, Object> args) throws Exception { + status.put(Nutch.STAT_MESSAGE, "running"); + int cnt = 60; + while (!shouldStop && cnt-- > 0) { + Thread.sleep(1000); + } + if (cnt == 0) { + results.put("res", "failed"); + } + return results; + } + + @Override + public boolean stopJob() throws Exception { + results.put("res", "stopped"); + shouldStop = true; + return true; + } + + @Override + public boolean killJob() throws Exception { + results.put("res", "killed"); + shouldStop = true; + return true; + } + } }
