Author: bdelacretaz
Date: Wed Jul 14 12:03:01 2010
New Revision: 964017

URL: http://svn.apache.org/viewvc?rev=964017&view=rev
Log:
SLING-550 - SuspendableOutputStream will be used to suspend/stop background 
servlets

Added:
    
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/SuspendableOutputStream.java
   (with props)
    sling/trunk/contrib/extensions/bgservlets/src/test/
    sling/trunk/contrib/extensions/bgservlets/src/test/java/
    sling/trunk/contrib/extensions/bgservlets/src/test/java/org/
    sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/
    sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/
    
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/
    
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/
    
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/SuspendableOutputStreamTest.java
   (with props)
Modified:
    sling/trunk/contrib/extensions/bgservlets/pom.xml

Modified: sling/trunk/contrib/extensions/bgservlets/pom.xml
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/bgservlets/pom.xml?rev=964017&r1=964016&r2=964017&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/bgservlets/pom.xml (original)
+++ sling/trunk/contrib/extensions/bgservlets/pom.xml Wed Jul 14 12:03:01 2010
@@ -97,5 +97,10 @@
       <artifactId>org.apache.felix.webconsole</artifactId>
       <version>3.0.0</version>
     </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
\ No newline at end of file

Added: 
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/SuspendableOutputStream.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/SuspendableOutputStream.java?rev=964017&view=auto
==============================================================================
--- 
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/SuspendableOutputStream.java
 (added)
+++ 
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/SuspendableOutputStream.java
 Wed Jul 14 12:03:01 2010
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.bgservlets.impl;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.sling.bgservlets.JobStatus;
+
+/** Wraps an OutputStream with controls for suspending it or
+ *     throwing an IOException next time it is written to.
+ *     Used to suspend background servlets (by blocking the 
+ *     stream) or stop them (by throwing an exception).
+ */
+public class SuspendableOutputStream extends FilterOutputStream implements 
JobStatus {
+       private State state = State.RUNNING;
+       private boolean closed = false;
+       
+       @SuppressWarnings("serial")
+       public static class StreamStoppedException extends IOException {
+               StreamStoppedException() {
+                       super("Stopped by " + 
SuspendableOutputStream.class.getSimpleName());
+               }
+       }
+       
+       public SuspendableOutputStream(OutputStream os) {
+               super(os);
+       }
+
+       @Override
+       public void write(byte[] b, int off, int len) throws IOException {
+               checkWritePermission();
+               super.write(b, off, len);
+       }
+
+       @Override
+       public void write(byte[] b) throws IOException {
+               checkWritePermission();
+               super.write(b);
+       }
+
+       @Override
+       public void write(int b) throws IOException {
+               checkWritePermission();
+               super.write(b);
+       }
+       
+       @Override
+       public void close() throws IOException {
+               super.close();
+               state = State.DONE;
+               closed = true;
+       }
+       
+       private void checkWritePermission() throws IOException {
+               if(closed) {
+                       throw new IOException("Attempt to write to closed 
stream");
+               }
+               
+               if(state == State.STOP_REQUESTED || state == State.STOPPED) {
+                       state = State.STOPPED;
+                       // stopped: throw exception to stop stream user
+                       flush();
+                       throw new StreamStoppedException();
+                       
+               } else if(state == State.SUSPEND_REQUESTED || state == 
State.SUSPENDED) {
+                       synchronized (this) {
+                               if(state == State.SUSPEND_REQUESTED || state == 
State.SUSPENDED)
+                                       state = State.SUSPENDED;
+                                       try {
+                                               // suspended: block until 
resumed
+                                               wait();
+                                       } catch (InterruptedException e) {
+                                               throw new 
IOException("InterruptedException in checkWritePermission()", e);
+                                       }
+                       }
+               }
+       }
+
+
+       public State getState() {
+               return state;
+       }
+
+       /** Only SUSPENDED, STOP, and RUNNING make sense here */ 
+       public synchronized void requestStateChange(State s) {
+               boolean illegal = false;
+               
+               if(s == State.SUSPENDED) {
+                       if(state == State.RUNNING) {
+                               state = State.SUSPEND_REQUESTED;
+                       } else if(state == State.SUSPEND_REQUESTED || state == 
State.SUSPENDED) {
+                               // ignore change
+                       } else {
+                               illegal = true;
+                       }
+                       
+               } else if(s == State.STOPPED) {
+                       if(state == State.RUNNING) {
+                               state = State.STOP_REQUESTED;
+                       } else if (state == State.STOP_REQUESTED || state == 
State.STOPPED) {
+                               // ignore change
+                       } else {
+                               illegal = true;
+                       }
+                       
+               } else if(s == State.RUNNING) {
+                       if(state == State.SUSPEND_REQUESTED || state == 
State.SUSPENDED) {
+                               state = State.RUNNING;
+                               notify();
+                       }
+               }
+               
+               if(illegal) {
+                       throw new IllegalStateException("Illegal state change:" 
+ state + " -> " + s);
+               }
+       }
+
+       /** Not implemented
+        *      @throws UnsupportedOperationException */
+       public String getPath() {
+               throw new UnsupportedOperationException("getPath() is not 
applicable to this class");
+       }
+}

Propchange: 
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/SuspendableOutputStream.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/SuspendableOutputStream.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL

Added: 
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/SuspendableOutputStreamTest.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/SuspendableOutputStreamTest.java?rev=964017&view=auto
==============================================================================
--- 
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/SuspendableOutputStreamTest.java
 (added)
+++ 
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/SuspendableOutputStreamTest.java
 Wed Jul 14 12:03:01 2010
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.bgservlets.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.sling.bgservlets.JobStatus;
+import org.junit.Test;
+
+public class SuspendableOutputStreamTest {
+       public final static String TEST = 
"0123456789abcdefghijklmnopqrstuvwxyz";
+       
+       static class WriterThread extends Thread {
+               private final OutputStream os;
+               private final byte [] toWrite = "TEST".getBytes();
+               private Exception runException;
+               final static int WRITE_DELAY = 50;
+               
+               WriterThread(OutputStream os) {
+                       this.os = os;
+               }
+               
+               @Override
+               public void run() {
+                       try {
+                               while(true) {
+                                       os.write(toWrite);
+                                       Thread.sleep(WRITE_DELAY);
+                               }
+                       } catch(Exception e) {
+                               runException = e;
+                       }
+               }
+       }
+       
+       @Test
+       public void testNoSuspend() throws IOException {
+               final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+               final SuspendableOutputStream f = new 
SuspendableOutputStream(bos);
+               f.write(TEST.getBytes());
+               f.flush();
+               assertEquals("String should be fully written", TEST, 
bos.toString());
+       }
+       
+       @Test
+       public void testStop() throws IOException {
+               final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+               final SuspendableOutputStream f = new 
SuspendableOutputStream(bos);
+               assertEquals("Expecting RUNNING state first", 
JobStatus.State.RUNNING, f.getState());
+               f.write(TEST.getBytes());
+               f.flush();
+               
+               f.requestStateChange(JobStatus.State.STOPPED);
+               assertEquals("Expecting STOP_REQUESTED state before write", 
JobStatus.State.STOP_REQUESTED, f.getState());
+               try {
+                       f.write("nothing".getBytes());
+                       fail("Expected StreamStoppedException when writing to 
STOPPED stream");
+               } catch(SuspendableOutputStream.StreamStoppedException 
asExpected) {
+               }
+               
+               assertEquals("Expecting STOPPED state after write", 
JobStatus.State.STOPPED, f.getState());
+       }
+       
+       @Test
+       public void testSuspend() throws Exception {
+               final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+               final SuspendableOutputStream f = new 
SuspendableOutputStream(bos);
+               final WriterThread t = new WriterThread(f);
+               t.setDaemon(true);
+               t.start();
+               
+               final long delay = WriterThread.WRITE_DELAY * 3;
+               Thread.sleep(delay);
+               assertTrue("Expecting data to be written by WriterThread", 
bos.size() > 0);
+               
+               f.requestStateChange(JobStatus.State.SUSPENDED);
+               Thread.sleep(delay);
+               assertEquals("Expecting SUSPEND state after a few writes", 
JobStatus.State.SUSPENDED, f.getState());
+               
+               final int count = bos.size();
+               Thread.sleep(delay);
+               assertEquals("Expecting no writes in SUSPEND state", count, 
bos.size());
+               
+               f.requestStateChange(JobStatus.State.RUNNING);
+               Thread.sleep(delay);
+               assertEquals("Expecting RUNNING state", 
JobStatus.State.RUNNING, f.getState());
+               assertTrue("Expecting data to be written after resuming", 
bos.size() > count);
+               
+               f.close();
+               Thread.sleep(delay);
+               assertFalse("Expecting WriterThread to end after closing 
stream", t.isAlive());
+               assertNotNull("Expecting non-null Exception in WriterThread", 
t.runException);
+               assertTrue("Expecting IOException in WriterThread", 
t.runException instanceof IOException);
+       }
+}

Propchange: 
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/SuspendableOutputStreamTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/SuspendableOutputStreamTest.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL


Reply via email to