Author: bdelacretaz
Date: Tue Jul 20 15:25:28 2010
New Revision: 965878
URL: http://svn.apache.org/viewvc?rev=965878&view=rev
Log:
SLING-550 - JCR-backed permanent streams, will be used to store the output of
background servlets
Added:
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeInputStream.java
(with props)
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeOutputStream.java
(with props)
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/nodestream/
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/nodestream/NodeStreamTest.java
(with props)
Modified:
sling/trunk/contrib/extensions/bgservlets/pom.xml
Modified: sling/trunk/contrib/extensions/bgservlets/pom.xml
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/bgservlets/pom.xml?rev=965878&r1=965877&r2=965878&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/bgservlets/pom.xml (original)
+++ sling/trunk/contrib/extensions/bgservlets/pom.xml Tue Jul 20 15:25:28 2010
@@ -97,10 +97,20 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.commons.testing</artifactId>
+ <version>2.0.4-incubator</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</dependency>
<dependency>
+ <groupId>javax.jcr</groupId>
+ <artifactId>jcr</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
Added:
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeInputStream.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeInputStream.java?rev=965878&view=auto
==============================================================================
---
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeInputStream.java
(added)
+++
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeInputStream.java
Tue Jul 20 15:25:28 2010
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.bgservlets.impl.nodestream;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.jcr.Node;
+import javax.jcr.Property;
+import javax.jcr.RepositoryException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Reads data stored by a {...@link NodeOutputStream}
+ * and rebuilds a continuous stream out of the
+ * multiple Properties that it creates.
+ */
+public class NodeInputStream extends InputStream {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ /** The Node under which we read our data */
+ private final Node node;
+
+ /** Counter used to build the name of Property from
+ * which we currently read */
+ private int counter;
+
+ /** Current stream that we are reading */
+ private InputStream currentStream;
+
+ NodeInputStream(Node n) throws IOException {
+ node = n;
+ selectNextStream();
+ }
+
+ /** Select next property to read from and open its stream */
+ private void selectNextStream() throws IOException {
+ counter++;
+ final String name = NodeOutputStream.STREAM_PROPERTY_NAME_PREFIX +
counter;
+ try {
+ if(node.hasProperty(name)) {
+ final Property p = node.getProperty(name);
+ currentStream = p.getStream();
+ log.debug("Switched to the InputStream of Property {}",
p.getPath());
+ } else {
+ currentStream = null;
+ log.debug("Property {} not found, end of stream",
node.getPath() + "/" + name);
+ }
+ } catch(RepositoryException re) {
+ throw new IOException("RepositoryException in
selectNextProperty()", re);
+ }
+ }
+
+ @Override
+ public int available() throws IOException {
+ return currentStream == null ? 0 : currentStream.available();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if(currentStream != null) {
+ currentStream.close();
+ }
+ super.close();
+ }
+
+ @Override
+ public int read() throws IOException {
+ if(currentStream == null) {
+ return -1;
+ }
+ int result = currentStream.read();
+ if(result == -1) {
+ selectNextStream();
+ return read();
+ }
+ return result;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if(currentStream == null) {
+ return 0;
+ }
+ int result = currentStream.read(b, off, len);
+ if(result == 0) {
+ selectNextStream();
+ return read(b, off, len);
+ }
+ return result;
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+}
Propchange:
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeInputStream.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeInputStream.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision Rev URL
Added:
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeOutputStream.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeOutputStream.java?rev=965878&view=auto
==============================================================================
---
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeOutputStream.java
(added)
+++
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeOutputStream.java
Tue Jul 20 15:25:28 2010
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.bgservlets.impl.nodestream;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import javax.jcr.Node;
+import javax.jcr.RepositoryException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/** An OutputStream stored in properties under
+ * a JCR node. The content is persisted on
+ * each flush() call, using sequentially-named
+ * properties so that {...@link NodeInputStream} can
+ * reconstruct the stream from permanent storage.
+ * flush() is also called automatically every
+ * BUFFER_SWITCH_SIZE bytes, to keep our memory
+ * requirements low.
+ *
+ * Meant to be used when running background servlets:
+ * we want to save their output in a way that
+ * survives system restart.
+ */
+public class NodeOutputStream extends OutputStream {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ /** Prefix for Property names used to store our streams */
+ public static final String STREAM_PROPERTY_NAME_PREFIX = "_NODE_STREAM_";
+
+ /** The Node under which we write our data */
+ private final Node node;
+
+ /** Counter used to build the name of Property to
+ * which we currently write */
+ private int counter;
+
+ /** Buffer to hold data before writing it to a Property */
+ private final ByteArrayOutputStream buffer = new
ByteArrayOutputStream(BUFFER_SIZE);
+
+ public static final int BUFFER_SIZE = 32768;
+ public static final int BUFFER_SWITCH_SIZE = BUFFER_SIZE * 100 / 90;
+
+ public NodeOutputStream(Node n) {
+ node = n;
+ }
+
+ /** Calls flush to persist our stream, before closing */
+ @Override
+ public void close() throws IOException {
+ flush();
+ buffer.close();
+ }
+
+ /** Store the contents of our buffer to a new Property under our
+ * node, numbered sequentially.
+ */
+ @Override
+ public void flush() throws IOException {
+ counter++;
+ final String name = NodeOutputStream.STREAM_PROPERTY_NAME_PREFIX +
counter;
+ try {
+ node.setProperty(name, new
ByteArrayInputStream(buffer.toByteArray()));
+ log.debug("Saved {} bytes to Property {}", buffer.size(),
node.getProperty(name).getPath());
+ node.save();
+ buffer.reset();
+ } catch(RepositoryException re) {
+ throw new IOException("RepositoryException in flush()", re);
+ }
+ }
+
+ private void flushIfNeeded() throws IOException {
+ if(buffer.size() >= BUFFER_SWITCH_SIZE) {
+ log.debug("Buffer size {} reached switch level {}, flushing",
buffer.size(), BUFFER_SWITCH_SIZE);
+ flush();
+ }
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ buffer.write(b, off, len);
+ flushIfNeeded();
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ buffer.write(b);
+ flushIfNeeded();
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ buffer.write(b);
+ flushIfNeeded();
+ }
+}
Propchange:
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeOutputStream.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeOutputStream.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision Rev URL
Added:
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/nodestream/NodeStreamTest.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/nodestream/NodeStreamTest.java?rev=965878&view=auto
==============================================================================
---
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/nodestream/NodeStreamTest.java
(added)
+++
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/nodestream/NodeStreamTest.java
Tue Jul 20 15:25:28 2010
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.bgservlets.impl.nodestream;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.jcr.Node;
+
+import org.apache.sling.commons.testing.jcr.RepositoryTestBase;
+
+public class NodeStreamTest extends RepositoryTestBase {
+
+ public static final String ASCII_DATA =
"0123456789abcdefgjijklmnoprqstuvwxyz";
+ public static final byte [] BINARY_DATA = getBinaryData();
+ public static final String NAME_PREFIX = "testNode";
+ private int counter;
+
+ private void assertStream(InputStream expected, InputStream actual) throws
IOException {
+ int offset = 0;
+ while(true) {
+ final int exp = expected.read();
+ if(exp == -1) {
+ assertEquals("Expecting end of actual stream at offset " +
offset, -1, actual.read());
+ break;
+ } else {
+ final int act = actual.read();
+ assertEquals("Expecting same data at offset " + offset, exp,
act);
+ }
+ offset++;
+ }
+ }
+
+ public void testAsciiWriteAndRead() throws Exception {
+ final Node testNode = getTestRootNode().addNode(NAME_PREFIX +
counter++);
+ testNode.getSession().save();
+ final NodeOutputStream nos = new NodeOutputStream(testNode);
+ nos.write(ASCII_DATA.getBytes());
+ nos.close();
+ final NodeInputStream nis = new NodeInputStream(testNode);
+ assertStream(new ByteArrayInputStream(ASCII_DATA.getBytes()), nis);
+ }
+
+ public void testBinaryWriteAndRead() throws Exception {
+ final Node testNode = getTestRootNode().addNode(NAME_PREFIX +
counter++);
+ testNode.getSession().save();
+ final NodeOutputStream nos = new NodeOutputStream(testNode);
+ nos.write(BINARY_DATA);
+ nos.close();
+ final NodeInputStream nis = new NodeInputStream(testNode);
+ assertStream(new ByteArrayInputStream(BINARY_DATA), nis);
+ }
+
+ public void testBigBinaryWriteAndRead() throws Exception {
+ final int FACTOR = 20;
+ final byte [] data = bigData(BINARY_DATA, FACTOR);
+ assertEquals("Expecting " + FACTOR + "x test data size", FACTOR *
BINARY_DATA.length, data.length);
+ final Node testNode = getTestRootNode().addNode(NAME_PREFIX +
counter++);
+ testNode.getSession().save();
+ final NodeOutputStream nos = new NodeOutputStream(testNode);
+ nos.write(data);
+ nos.close();
+
+ assertFalse("Expecting no pending changes in testNode session",
testNode.getSession().hasPendingChanges());
+
+ final NodeInputStream nis = new NodeInputStream(testNode);
+ assertStream(new ByteArrayInputStream(data), nis);
+ }
+
+ public void testMultipleBinaryWrites() throws Exception {
+ final int FACTOR = 20;
+ final Node testNode = getTestRootNode().addNode(NAME_PREFIX +
counter++);
+ testNode.getSession().save();
+ final NodeOutputStream nos = new NodeOutputStream(testNode);
+ for(int i=0; i < FACTOR; i++) {
+ nos.write(BINARY_DATA);
+ }
+ nos.close();
+
+ assertFalse("Expecting no pending changes in testNode session",
testNode.getSession().hasPendingChanges());
+ final long propCount = testNode.getProperties().getSize();
+ final long expect = 10;
+ assertTrue("Expecting > " + expect + " properties on test node",
propCount > expect);
+
+ final byte [] data = bigData(BINARY_DATA, FACTOR);
+ final NodeInputStream nis = new NodeInputStream(testNode);
+ assertStream(new ByteArrayInputStream(data), nis);
+ }
+
+ public void testWriteWithOffset() throws Exception {
+ final int FACTOR = 20;
+ final byte [] data = bigData(BINARY_DATA, FACTOR);
+ assertEquals("Expecting " + FACTOR + "x test data size", FACTOR *
BINARY_DATA.length, data.length);
+
+ final Node testNode = getTestRootNode().addNode(NAME_PREFIX +
counter++);
+ testNode.getSession().save();
+
+ final NodeOutputStream nos = new NodeOutputStream(testNode);
+ int offset = 0;
+ int step = 1271;
+ while(offset < data.length && step > 0) {
+ step = Math.min(step, data.length - offset);
+ nos.write(data, offset, step);
+ offset += step;
+ }
+ nos.close();
+
+ final NodeInputStream nis = new NodeInputStream(testNode);
+ assertStream(new ByteArrayInputStream(data), nis);
+ }
+
+ private byte[] bigData(byte [] data, int multiplier) {
+ final byte [] result = new byte[data.length * multiplier];
+ int destPos = 0;
+ for(int i=0; i < multiplier; i++) {
+ System.arraycopy(data, 0, result, destPos, data.length);
+ destPos += data.length;
+ }
+ return result;
+ }
+
+ private static byte [] getBinaryData() {
+ final ByteArrayOutputStream os = new ByteArrayOutputStream();
+ for(int i=0;i < 66000; i++) {
+ os.write(i);
+ }
+ return os.toByteArray();
+ }
+}
Propchange:
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/nodestream/NodeStreamTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/nodestream/NodeStreamTest.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision Rev URL