Author: bdelacretaz
Date: Wed Jul 14 13:24:08 2010
New Revision: 964041
URL: http://svn.apache.org/viewvc?rev=964041&view=rev
Log:
SLING-550 - suspend/resume/stop jobs from webconsole, work in progress
Modified:
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/BackgroundTestServlet.java
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/FilterChainExecutionJob.java
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/ServletResponseWrapper.java
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/SuspendableOutputStream.java
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/webconsole/ExecutionEngineConsolePlugin.java
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/SuspendableOutputStreamTest.java
Modified:
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/BackgroundTestServlet.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/BackgroundTestServlet.java?rev=964041&r1=964040&r2=964041&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/BackgroundTestServlet.java
(original)
+++
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/BackgroundTestServlet.java
Wed Jul 14 13:24:08 2010
@@ -29,6 +29,8 @@ import org.apache.felix.scr.annotations.
import org.apache.sling.api.SlingHttpServletRequest;
import org.apache.sling.api.SlingHttpServletResponse;
import org.apache.sling.api.servlets.SlingSafeMethodsServlet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** Servlet used for interactive testing of the background
* servlets engine.
@@ -40,6 +42,8 @@ import org.apache.sling.api.servlets.Sli
@Property(name="sling.servlet.paths", value="/system/bgservlets/test")
public class BackgroundTestServlet extends SlingSafeMethodsServlet {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
@Override
protected void doGet(SlingHttpServletRequest request,
SlingHttpServletResponse response) throws
ServletException,
@@ -51,20 +55,24 @@ public class BackgroundTestServlet exten
final int interval = getIntParam(request, "interval", 1);
final int flushEvery = getIntParam(request, "flushEvery", 2);
- for(int i=1; i <= cycles; i++) {
- if(i % flushEvery == 0) {
- w.println("Flushing output");
- w.flush();
- }
- w.printf("Cycle %d of %d\n", i, cycles);
- try {
- Thread.sleep(interval * 1000);
- } catch(InterruptedException iex) {
- throw new
ServletException("InterruptedException", iex);
+ try {
+ for(int i=1; i <= cycles; i++) {
+ if(i % flushEvery == 0) {
+ w.println("Flushing output");
+ w.flush();
+ }
+ w.printf("Cycle %d of %d\n", i, cycles);
+ try {
+ Thread.sleep(interval * 1000);
+ } catch(InterruptedException iex) {
+ throw new
ServletException("InterruptedException", iex);
+ }
}
+ w.println("All done.");
+ w.flush();
+ } catch(Throwable t) {
+ log.info("Exception in doGet", t);
}
- w.println("All done.");
- w.flush();
}
private int getIntParam(SlingHttpServletRequest request, String name,
int defaultValue) {
Modified:
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/FilterChainExecutionJob.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/FilterChainExecutionJob.java?rev=964041&r1=964040&r2=964041&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/FilterChainExecutionJob.java
(original)
+++
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/FilterChainExecutionJob.java
Wed Jul 14 13:24:08 2010
@@ -18,6 +18,8 @@
*/
package org.apache.sling.bgservlets.impl;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import javax.servlet.FilterChain;
@@ -35,8 +37,8 @@ class FilterChainExecutionJob implements
private final Logger log = LoggerFactory.getLogger(getClass());
private final FilterChain chain;
private final ServletResponseWrapper response;
+ private final SuspendableOutputStream stream;
private final String path;
- private State state = State.NEW;
// TODO is it ok to keep a reference to the request until run() is
called??
private final HttpServletRequest request;
@@ -44,12 +46,17 @@ class FilterChainExecutionJob implements
FilterChainExecutionJob(FilterChain chain, HttpServletRequest request,
HttpServletResponse hsr) throws IOException {
this.chain = chain;
this.request = request;
- response = new ServletResponseWrapper(hsr);
- path = response.getOutputPath();
+
+ // TODO write output to the Sling repository. For now: just a
temp file
+ final File output =
File.createTempFile(getClass().getSimpleName(), ".data");
+ output.deleteOnExit();
+ path = output.getAbsolutePath();
+ stream = new SuspendableOutputStream(new
FileOutputStream(output));
+ response = new ServletResponseWrapper(hsr, stream);
}
public String toString() {
- return getClass().getSimpleName() + ", state=" + state + ",
path=" + path;
+ return getClass().getSimpleName() + ", state=" + getState() +
", path=" + path;
}
public void run() {
@@ -73,11 +80,10 @@ class FilterChainExecutionJob implements
}
public State getState() {
- return state;
+ return stream.getState();
}
public void requestStateChange(State s) {
- // TODO need some validity checks
- state = s;
+ stream.requestStateChange(s);
}
}
Modified:
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/ServletResponseWrapper.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/ServletResponseWrapper.java?rev=964041&r1=964040&r2=964041&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/ServletResponseWrapper.java
(original)
+++
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/ServletResponseWrapper.java
Wed Jul 14 13:24:08 2010
@@ -18,8 +18,6 @@
*/
package org.apache.sling.bgservlets.impl;
-import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
@@ -32,15 +30,14 @@ import javax.servlet.http.HttpServletRes
/** Wraps an HttpServletResponse for background processing */
class ServletResponseWrapper extends HttpServletResponseWrapper {
- private final String outputPath;
private final ServletOutputStream stream;
private final PrintWriter writer;
- static class CustomOutputStream extends ServletOutputStream {
+ static class ServletOutputStreamWrapper extends ServletOutputStream {
private final OutputStream os;
- CustomOutputStream(OutputStream os) {
+ ServletOutputStreamWrapper(OutputStream os) {
this.os = os;
}
@@ -61,24 +58,12 @@ class ServletResponseWrapper extends Htt
}
- ServletResponseWrapper(HttpServletResponse response) throws IOException
{
+ ServletResponseWrapper(HttpServletResponse response, OutputStream os)
throws IOException {
super(response);
- // TODO write output to the Sling repository. For now: just a
temp file
- final File output =
File.createTempFile(getClass().getSimpleName(), ".data");
- output.deleteOnExit();
- outputPath = output.getAbsolutePath();
- stream = new CustomOutputStream(new FileOutputStream(output));
+ stream = new ServletOutputStreamWrapper(os);
writer = new PrintWriter(new OutputStreamWriter(stream));
}
- public String toString() {
- return getClass().getName() + ":" + outputPath;
- }
-
- String getOutputPath() {
- return outputPath;
- }
-
void cleanup() throws IOException {
stream.flush();
stream.close();
Modified:
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/SuspendableOutputStream.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/SuspendableOutputStream.java?rev=964041&r1=964040&r2=964041&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/SuspendableOutputStream.java
(original)
+++
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/SuspendableOutputStream.java
Wed Jul 14 13:24:08 2010
@@ -30,11 +30,11 @@ import org.apache.sling.bgservlets.JobSt
* stream) or stop them (by throwing an exception).
*/
public class SuspendableOutputStream extends FilterOutputStream implements
JobStatus {
- private State state = State.RUNNING;
+ private State state = State.NEW;
private boolean closed = false;
@SuppressWarnings("serial")
- public static class StreamStoppedException extends IOException {
+ public static class StreamStoppedException extends RuntimeException {
StreamStoppedException() {
super("Stopped by " +
SuspendableOutputStream.class.getSimpleName());
}
@@ -86,7 +86,9 @@ public class SuspendableOutputStream ext
state = State.SUSPENDED;
try {
// suspended: block until
resumed
- wait();
+ while(state == State.SUSPENDED)
{
+ wait();
+ }
} catch (InterruptedException e) {
throw new
IOException("InterruptedException in checkWritePermission()", e);
}
@@ -103,9 +105,12 @@ public class SuspendableOutputStream ext
public synchronized void requestStateChange(State s) {
boolean illegal = false;
- if(s == State.SUSPENDED) {
- if(state == State.RUNNING) {
+ if(state == State.DONE) {
+ // ignore changes
+ } else if(s == State.SUSPENDED) {
+ if(state == State.NEW || state == State.QUEUED || state
== State.RUNNING) {
state = State.SUSPEND_REQUESTED;
+ notify();
} else if(state == State.SUSPEND_REQUESTED || state ==
State.SUSPENDED) {
// ignore change
} else {
@@ -113,8 +118,10 @@ public class SuspendableOutputStream ext
}
} else if(s == State.STOPPED) {
- if(state == State.RUNNING) {
+ if(state == State.NEW || state == State.QUEUED || state
== State.RUNNING
+ || state == State.SUSPEND_REQUESTED ||
state == State.SUSPENDED) {
state = State.STOP_REQUESTED;
+ notify();
} else if (state == State.STOP_REQUESTED || state ==
State.STOPPED) {
// ignore change
} else {
@@ -126,6 +133,10 @@ public class SuspendableOutputStream ext
state = State.RUNNING;
notify();
}
+
+ } else {
+ state = s;
+ notify();
}
if(illegal) {
Modified:
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/webconsole/ExecutionEngineConsolePlugin.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/webconsole/ExecutionEngineConsolePlugin.java?rev=964041&r1=964040&r2=964041&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/webconsole/ExecutionEngineConsolePlugin.java
(original)
+++
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/webconsole/ExecutionEngineConsolePlugin.java
Wed Jul 14 13:24:08 2010
@@ -106,6 +106,7 @@ public class ExecutionEngineConsolePlugi
public String getTitle() {
return TITLE;
}
+
@Override
protected void renderContent(HttpServletRequest req,
HttpServletResponse res)
throws ServletException, IOException {
@@ -115,17 +116,65 @@ public class ExecutionEngineConsolePlugi
pw.println("No ExecutionEngine service found");
return;
}
+
+ // TODO should use POST
+ final String jobPath = req.getParameter("jobPath");
+ if(jobPath != null) {
+ final JobStatus job = ee.getJobStatus(jobPath);
+ if(job != null) {
+ final String action = req.getParameter("action");
+ if("suspend".equals(action)) {
+
job.requestStateChange(JobStatus.State.SUSPENDED);
+ } else if("stop".equals(action)) {
+ job.requestStateChange(JobStatus.State.STOPPED);
+ } else if("resume".equals(action)) {
+ job.requestStateChange(JobStatus.State.RUNNING);
+ }
+ }
+ }
+
+ pw.println("TODO: provide a way to cleanup old jobs<br/>");
+ pw.println("TODO: optionally list active jobs only<br/>");
- pw.println("ExecutionEngine jobs:<br/>");
- pw.println("<pre>");
+ pw.println("<table class='content' cellpadding='0' cellspacing='0'
width='100%'>");
+ pw.println("<thead>");
+ pw.println("<tr class='content'>");
+ pw.println("<th class='content container'>Controls</th>");
+ pw.println("<th class='content container'>State</th>");
+ pw.println("<th class='content container'>Path</th>");
+ pw.println("</tr>");
+ pw.println("</thead>");
+ pw.println("<tbody>");
+
final Iterator<JobStatus> it = ee.getMatchingJobStatus(null);
int count = 0;
while(it.hasNext()) {
- pw.println(it.next());
+ renderJobStatus(pw, it.next());
count++;
}
- pw.println("</pre>");
+ pw.println("</tbody>");
+ pw.println("</table>");
pw.println("Total <b>" + count + "</b> jobs.<br />");
}
+
+ private void renderJobStatus(PrintWriter pw, JobStatus job) {
+ // TODO should use POST
+ pw.println("<tr class='content'>");
+ pw.println("<td><form action='./" + LABEL + "' method='GET'>");
+ final String [] actions = { "suspend", "resume", "stop" };
+ for(String action : actions) {
+ pw.println("<input type='submit' name='action' value='"
+ action + "'/> ");
+ }
+ pw.println("<input type='hidden' name='jobPath' value='" +
job.getPath() + "'/> ");
+ pw.println("</form></td>");
+ pw.println("<td>");
+ pw.println(job.getState());
+ pw.println("</td>");
+ pw.println("<td>");
+ pw.println(job.getPath());
+ pw.println("</td>");
+ pw.println("</tr>");
+ }
+
}
}
\ No newline at end of file
Modified:
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/SuspendableOutputStreamTest.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/SuspendableOutputStreamTest.java?rev=964041&r1=964040&r2=964041&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/SuspendableOutputStreamTest.java
(original)
+++
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/SuspendableOutputStreamTest.java
Wed Jul 14 13:24:08 2010
@@ -70,7 +70,8 @@ public class SuspendableOutputStreamTest
public void testStop() throws IOException {
final ByteArrayOutputStream bos = new ByteArrayOutputStream();
final SuspendableOutputStream f = new
SuspendableOutputStream(bos);
- assertEquals("Expecting RUNNING state first",
JobStatus.State.RUNNING, f.getState());
+ assertEquals("Expecting NEW state first", JobStatus.State.NEW,
f.getState());
+ f.requestStateChange(JobStatus.State.RUNNING);
f.write(TEST.getBytes());
f.flush();
@@ -89,6 +90,7 @@ public class SuspendableOutputStreamTest
public void testSuspend() throws Exception {
final ByteArrayOutputStream bos = new ByteArrayOutputStream();
final SuspendableOutputStream f = new
SuspendableOutputStream(bos);
+ f.requestStateChange(JobStatus.State.RUNNING);
final WriterThread t = new WriterThread(f);
t.setDaemon(true);
t.start();
@@ -116,4 +118,43 @@ public class SuspendableOutputStreamTest
assertNotNull("Expecting non-null Exception in WriterThread",
t.runException);
assertTrue("Expecting IOException in WriterThread",
t.runException instanceof IOException);
}
+
+ @Test
+ public void testSuspendThenStop() throws Exception {
+ final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ final SuspendableOutputStream f = new
SuspendableOutputStream(bos);
+ assertEquals("Expecting NEW state first", JobStatus.State.NEW,
f.getState());
+ final WriterThread t = new WriterThread(f);
+ t.setDaemon(true);
+ t.start();
+
+ f.requestStateChange(JobStatus.State.SUSPENDED);
+
+ final long delay = WriterThread.WRITE_DELAY * 3;
+ Thread.sleep(delay);
+ assertEquals("Expecting SUSPEND state after a few writes",
JobStatus.State.SUSPENDED, f.getState());
+
+ f.requestStateChange(JobStatus.State.STOPPED);
+ assertEquals("Expecting STOP_REQUESTED state before write",
JobStatus.State.STOP_REQUESTED, f.getState());
+ try {
+ f.write("nothing".getBytes());
+ fail("Expected StreamStoppedException when writing to
STOPPED stream");
+ } catch(SuspendableOutputStream.StreamStoppedException
asExpected) {
+ }
+
+ assertEquals("Expecting STOPPED state after write",
JobStatus.State.STOPPED, f.getState());
+ f.close();
+ }
+
+ @Test
+ public void testDone() {
+ final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ final SuspendableOutputStream f = new
SuspendableOutputStream(bos);
+ f.requestStateChange(JobStatus.State.DONE);
+ assertEquals("Expecting DONE state (1)", JobStatus.State.DONE,
f.getState());
+ f.requestStateChange(JobStatus.State.SUSPENDED);
+ assertEquals("Expecting DONE state (2)", JobStatus.State.DONE,
f.getState());
+ f.requestStateChange(JobStatus.State.STOPPED);
+ assertEquals("Expecting DONE state (3)", JobStatus.State.DONE,
f.getState());
+ }
}