Author: bdelacretaz
Date: Wed Jul 14 12:03:01 2010
New Revision: 964017
URL: http://svn.apache.org/viewvc?rev=964017&view=rev
Log:
SLING-550 - SuspendableOutputStream will be used to suspend/stop background
servlets
Added:
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/SuspendableOutputStream.java
(with props)
sling/trunk/contrib/extensions/bgservlets/src/test/
sling/trunk/contrib/extensions/bgservlets/src/test/java/
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/SuspendableOutputStreamTest.java
(with props)
Modified:
sling/trunk/contrib/extensions/bgservlets/pom.xml
Modified: sling/trunk/contrib/extensions/bgservlets/pom.xml
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/bgservlets/pom.xml?rev=964017&r1=964016&r2=964017&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/bgservlets/pom.xml (original)
+++ sling/trunk/contrib/extensions/bgservlets/pom.xml Wed Jul 14 12:03:01 2010
@@ -97,5 +97,10 @@
<artifactId>org.apache.felix.webconsole</artifactId>
<version>3.0.0</version>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
Added:
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/SuspendableOutputStream.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/SuspendableOutputStream.java?rev=964017&view=auto
==============================================================================
---
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/SuspendableOutputStream.java
(added)
+++
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/SuspendableOutputStream.java
Wed Jul 14 12:03:01 2010
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.bgservlets.impl;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.sling.bgservlets.JobStatus;
+
+/** Wraps an OutputStream with controls for suspending it or
+ * throwing an IOException next time it is written to.
+ * Used to suspend background servlets (by blocking the
+ * stream) or stop them (by throwing an exception).
+ */
+public class SuspendableOutputStream extends FilterOutputStream implements
JobStatus {
+ private State state = State.RUNNING;
+ private boolean closed = false;
+
+ @SuppressWarnings("serial")
+ public static class StreamStoppedException extends IOException {
+ StreamStoppedException() {
+ super("Stopped by " +
SuspendableOutputStream.class.getSimpleName());
+ }
+ }
+
+ public SuspendableOutputStream(OutputStream os) {
+ super(os);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ checkWritePermission();
+ super.write(b, off, len);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ checkWritePermission();
+ super.write(b);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ checkWritePermission();
+ super.write(b);
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ state = State.DONE;
+ closed = true;
+ }
+
+ private void checkWritePermission() throws IOException {
+ if(closed) {
+ throw new IOException("Attempt to write to closed
stream");
+ }
+
+ if(state == State.STOP_REQUESTED || state == State.STOPPED) {
+ state = State.STOPPED;
+ // stopped: throw exception to stop stream user
+ flush();
+ throw new StreamStoppedException();
+
+ } else if(state == State.SUSPEND_REQUESTED || state ==
State.SUSPENDED) {
+ synchronized (this) {
+ if(state == State.SUSPEND_REQUESTED || state ==
State.SUSPENDED)
+ state = State.SUSPENDED;
+ try {
+ // suspended: block until
resumed
+ wait();
+ } catch (InterruptedException e) {
+ throw new
IOException("InterruptedException in checkWritePermission()", e);
+ }
+ }
+ }
+ }
+
+
+ public State getState() {
+ return state;
+ }
+
+ /** Only SUSPENDED, STOP, and RUNNING make sense here */
+ public synchronized void requestStateChange(State s) {
+ boolean illegal = false;
+
+ if(s == State.SUSPENDED) {
+ if(state == State.RUNNING) {
+ state = State.SUSPEND_REQUESTED;
+ } else if(state == State.SUSPEND_REQUESTED || state ==
State.SUSPENDED) {
+ // ignore change
+ } else {
+ illegal = true;
+ }
+
+ } else if(s == State.STOPPED) {
+ if(state == State.RUNNING) {
+ state = State.STOP_REQUESTED;
+ } else if (state == State.STOP_REQUESTED || state ==
State.STOPPED) {
+ // ignore change
+ } else {
+ illegal = true;
+ }
+
+ } else if(s == State.RUNNING) {
+ if(state == State.SUSPEND_REQUESTED || state ==
State.SUSPENDED) {
+ state = State.RUNNING;
+ notify();
+ }
+ }
+
+ if(illegal) {
+ throw new IllegalStateException("Illegal state change:"
+ state + " -> " + s);
+ }
+ }
+
+ /** Not implemented
+ * @throws UnsupportedOperationException */
+ public String getPath() {
+ throw new UnsupportedOperationException("getPath() is not
applicable to this class");
+ }
+}
Propchange:
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/SuspendableOutputStream.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/SuspendableOutputStream.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision Rev URL
Added:
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/SuspendableOutputStreamTest.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/SuspendableOutputStreamTest.java?rev=964017&view=auto
==============================================================================
---
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/SuspendableOutputStreamTest.java
(added)
+++
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/SuspendableOutputStreamTest.java
Wed Jul 14 12:03:01 2010
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.bgservlets.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.sling.bgservlets.JobStatus;
+import org.junit.Test;
+
+public class SuspendableOutputStreamTest {
+ public final static String TEST =
"0123456789abcdefghijklmnopqrstuvwxyz";
+
+ static class WriterThread extends Thread {
+ private final OutputStream os;
+ private final byte [] toWrite = "TEST".getBytes();
+ private Exception runException;
+ final static int WRITE_DELAY = 50;
+
+ WriterThread(OutputStream os) {
+ this.os = os;
+ }
+
+ @Override
+ public void run() {
+ try {
+ while(true) {
+ os.write(toWrite);
+ Thread.sleep(WRITE_DELAY);
+ }
+ } catch(Exception e) {
+ runException = e;
+ }
+ }
+ }
+
+ @Test
+ public void testNoSuspend() throws IOException {
+ final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ final SuspendableOutputStream f = new
SuspendableOutputStream(bos);
+ f.write(TEST.getBytes());
+ f.flush();
+ assertEquals("String should be fully written", TEST,
bos.toString());
+ }
+
+ @Test
+ public void testStop() throws IOException {
+ final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ final SuspendableOutputStream f = new
SuspendableOutputStream(bos);
+ assertEquals("Expecting RUNNING state first",
JobStatus.State.RUNNING, f.getState());
+ f.write(TEST.getBytes());
+ f.flush();
+
+ f.requestStateChange(JobStatus.State.STOPPED);
+ assertEquals("Expecting STOP_REQUESTED state before write",
JobStatus.State.STOP_REQUESTED, f.getState());
+ try {
+ f.write("nothing".getBytes());
+ fail("Expected StreamStoppedException when writing to
STOPPED stream");
+ } catch(SuspendableOutputStream.StreamStoppedException
asExpected) {
+ }
+
+ assertEquals("Expecting STOPPED state after write",
JobStatus.State.STOPPED, f.getState());
+ }
+
+ @Test
+ public void testSuspend() throws Exception {
+ final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ final SuspendableOutputStream f = new
SuspendableOutputStream(bos);
+ final WriterThread t = new WriterThread(f);
+ t.setDaemon(true);
+ t.start();
+
+ final long delay = WriterThread.WRITE_DELAY * 3;
+ Thread.sleep(delay);
+ assertTrue("Expecting data to be written by WriterThread",
bos.size() > 0);
+
+ f.requestStateChange(JobStatus.State.SUSPENDED);
+ Thread.sleep(delay);
+ assertEquals("Expecting SUSPEND state after a few writes",
JobStatus.State.SUSPENDED, f.getState());
+
+ final int count = bos.size();
+ Thread.sleep(delay);
+ assertEquals("Expecting no writes in SUSPEND state", count,
bos.size());
+
+ f.requestStateChange(JobStatus.State.RUNNING);
+ Thread.sleep(delay);
+ assertEquals("Expecting RUNNING state",
JobStatus.State.RUNNING, f.getState());
+ assertTrue("Expecting data to be written after resuming",
bos.size() > count);
+
+ f.close();
+ Thread.sleep(delay);
+ assertFalse("Expecting WriterThread to end after closing
stream", t.isAlive());
+ assertNotNull("Expecting non-null Exception in WriterThread",
t.runException);
+ assertTrue("Expecting IOException in WriterThread",
t.runException instanceof IOException);
+ }
+}
Propchange:
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/SuspendableOutputStreamTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/SuspendableOutputStreamTest.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision Rev URL