Author: vgritsenko
Date: Wed Oct 20 05:47:33 2004
New Revision: 55147
Modified:
cocoon/trunk/src/blocks/scratchpad/java/org/apache/cocoon/transformation/IncludeTransformer.java
Log:
Resolver should be nulled out in recycle only when all threads are finished
(and released their sources)
Modified:
cocoon/trunk/src/blocks/scratchpad/java/org/apache/cocoon/transformation/IncludeTransformer.java
==============================================================================
---
cocoon/trunk/src/blocks/scratchpad/java/org/apache/cocoon/transformation/IncludeTransformer.java
(original)
+++
cocoon/trunk/src/blocks/scratchpad/java/org/apache/cocoon/transformation/IncludeTransformer.java
Wed Oct 20 05:47:33 2004
@@ -93,13 +93,14 @@
* than in series, in one thread. This parameter can be set in either the
transformer
* definition (to affect all IncludeTransformer instances):</p>
* <pre>
- * <parallel>true</parallel>
+ * <parallel>true</parallel>
* </pre>
+ *
* <p>or in a pipeline itself (to only affect that instance of the
IncludeTransformer):</p>
* <pre>
- * <map:parameter name="parallel" value="true"/>
+ * <map:parameter name="parallel" value="true"/>
* </pre>
- * <p>It defaults to "false".</p>
+ * <p>By default, parallel processing is turned off.</p>
*
* @cocoon.sitemap.component.name include
* @cocoon.sitemap.component.logger sitemap.transformer.include
@@ -133,6 +134,9 @@
/** <p>The encoding to use for parameter names and values.</p> */
private static final String ENCODING = "US-ASCII";
+ //
+ // Global configuration
+ //
/** <p>The [EMAIL PROTECTED] ServiceManager} instance associated with this
instance.</p> */
private ServiceManager m_manager;
@@ -187,9 +191,15 @@
/**
* <p>The IncludeBuffer that is used to buffering events if parallel
* processing is turned on</p>
+ * <p>This object is also used as a lock for thread counter m_threads</p>
*/
private SaxBuffer m_buffer;
+ /**
+ * <p>Inclusion threads/tasks counter (if executing in parallel)</p>
+ */
+ private int m_threads;
+
/**
* <p>Create a new [EMAIL PROTECTED] IncludeTransformer} instance.</p>
@@ -241,12 +251,21 @@
*/
public void recycle() {
this.m_namespaces = null;
- this.m_resolver = null;
this.m_validity = null;
this.x_parameters = null;
this.x_value = null;
- this.m_buffering = false;
- this.m_buffer = null;
+
+ if (this.m_buffering) {
+ // Wait for threads to complete and release Sources
+ waitForThreads();
+ this.m_buffering = false;
+ this.m_buffer = null;
+ }
+
+ // Resolver can be nulled out when all threads completed processing
+ // and released their Sources.
+ this.m_resolver = null;
+
super.recycle();
}
@@ -455,7 +474,9 @@
/* Check for parallel processing */
if (this.m_parallel) {
this.m_buffering = true;
- if (m_buffer == null) m_buffer = new SaxBuffer();
+ if (m_buffer == null) {
+ m_buffer = new SaxBuffer();
+ }
m_buffer.xmlizable(new IncludeBuffer(source));
} else {
SourceUtil.toSAX(this.m_manager, source, "text/xml",
@@ -546,6 +567,44 @@
}
/**
+ * Increment active threads counter
+ */
+ int incrementThreads() {
+ synchronized (m_buffer) {
+ return ++m_threads;
+ }
+ }
+
+ /**
+ * Decrement active threads counter
+ */
+ void decrementThreads() {
+ synchronized (m_buffer) {
+ if (--m_threads <= 0) {
+ m_buffer.notify();
+ }
+ }
+ }
+
+ /**
+ * Wait till there is no active threads
+ */
+ private void waitForThreads() {
+ synchronized (m_buffer) {
+ if (m_threads > 0) {
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug(m_threads + " threads in progress,
waiting");
+ }
+
+ try {
+ m_buffer.wait();
+ } catch (InterruptedException ignored) { }
+ // Don't continue waiting if interrupted.
+ }
+ }
+ }
+
+ /**
* Buffer for loading included source in separate thread.
* Streaming of the loaded buffer possible only when source is
* loaded completely. If loading is not complete, toSAX method
@@ -558,36 +617,56 @@
public IncludeBuffer(Source source) {
this.source = source;
+
// FIXME Need thread pool component. Based on
EDU.oswego.cs.dl.util.concurrent.PooledExecutor.
// See also
org.apache.cocoon.components.cron.QuartzJobScheduler.ThreadPool
- Thread t = new Thread(this);
- t.setName("IncludeSource#" + source.getURI());
- t.setDaemon(true);
- t.start();
+ try {
+ Thread t = new Thread(this);
+ t.setName("IncludeSource#" + source.getURI());
+ t.setDaemon(true);
+ t.start();
+ } catch (RuntimeException e) {
+ // In case we failed to spawn a thread
+ this.e = new SAXException(e);
+ m_resolver.release(source);
+ throw e;
+ }
}
- public synchronized void toSAX(ContentHandler contentHandler)
+ /**
+ * Stream content of this buffer when it is loaded completely.
+ * This method blocks if loading is not complete.
+ */
+ public void toSAX(ContentHandler contentHandler)
throws SAXException {
- if (!finished) {
- try {
- wait();
- } catch (InterruptedException ignored) { }
+ synchronized (this) {
+ if (!this.finished) {
+ try {
+ wait();
+ } catch (InterruptedException ignored) { }
+ // Don't continue waiting if interrupted.
+ }
}
- if (e != null) {
- throw e;
+ if (this.e != null) {
+ throw this.e;
}
super.toSAX(contentHandler);
}
+ /**
+ * Load content of the source into this buffer.
+ */
public void run() {
+ // Increment active threads counter
+ int t = incrementThreads();
if (getLogger().isDebugEnabled()) {
- getLogger().debug("Loading <" + source.getURI() + ">");
+ getLogger().debug("Thread #" + t + " loading <" +
source.getURI() + ">");
}
try {
- SourceUtil.toSAX(m_manager, source, "text/xml", new
EmbeddedXMLPipe(this));
+ SourceUtil.toSAX(m_manager, this.source, "text/xml", new
EmbeddedXMLPipe(this));
} catch (Exception e) {
if (!(e instanceof SAXException)) {
this.e = new SAXException(e);
@@ -596,18 +675,20 @@
}
} finally {
synchronized (this) {
- finished = true;
+ this.finished = true;
notify();
}
- m_resolver.release(source);
+ // Release source and decrement active threads counter
+ m_resolver.release(this.source);
+ decrementThreads();
}
if (getLogger().isDebugEnabled()) {
if (this.e == null) {
- getLogger().debug("Loaded <" + source.getURI() + ">");
+ getLogger().debug("Thread #" + t + " loaded <" +
source.getURI() + ">");
} else {
- getLogger().debug("Failed to load <" + source.getURI() +
">", this.e);
+ getLogger().debug("Thread #" + t + " failed to load <" +
source.getURI() + ">", this.e);
}
}
}