bloritsch 02/01/28 10:42:26
Modified: src/scratchpad/org/apache/avalon/excalibur/command
CommandManager.java
Added: src/scratchpad/org/apache/avalon/excalibur/command
EventPipeline.java TPCThreadManager.java
TPSPThreadManager.java ThreadManager.java
Log:
first stab at CommandManager/ThreadManager
Revision Changes Path
1.2 +171 -22
jakarta-avalon-excalibur/src/scratchpad/org/apache/avalon/excalibur/command/CommandManager.java
Index: CommandManager.java
===================================================================
RCS file:
/home/cvs/jakarta-avalon-excalibur/src/scratchpad/org/apache/avalon/excalibur/command/CommandManager.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- CommandManager.java 28 Jan 2002 16:01:00 -0000 1.1
+++ CommandManager.java 28 Jan 2002 18:42:25 -0000 1.2
@@ -7,34 +7,43 @@
*/
package org.apache.avalon.excalibur.command;
+import org.apache.avalon.excalibur.collections.Buffer;
+import org.apache.avalon.excalibur.collections.VariableSizeBuffer;
+import org.apache.avalon.excalibur.concurrent.Mutex;
import org.apache.avalon.excalibur.event.DefaultQueue;
+import org.apache.avalon.excalibur.event.EventHandler;
import org.apache.avalon.excalibur.event.Queue;
import org.apache.avalon.excalibur.event.QueueElement;
import org.apache.avalon.excalibur.event.Signal;
-import org.apache.avalon.excalibur.event.EventHandler;
+import org.apache.avalon.excalibur.event.Sink;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
/**
* The CommandManager handles asynchronous commands from the rest of the
system.
* The only exposed piece is the Queue that other components use to give
Commands
- * to this system.
+ * to this system. You <strong>must</strong> register this with a
ThreadManager
+ * for it to work.
*
* @author <a href="mailto:[EMAIL PROTECTED]">Berin Loritsch</a>
*/
-public class CommandManager implements EventHandler
+public class CommandManager implements EventPipeline
{
- private final Queue m_queue = new DefaultQueue();
- //private final ThreadManager;
+ private final Queue m_queue = new DefaultQueue();
+ private final HashMap m_signalHandlers = new HashMap();
+ private final Mutex m_mutex = new Mutex();
+ private final EventHandler m_eventHandler = new CommandEventHandler(
+ Collections.unmodifiableMap( m_signalHandlers ) );
+ private final Sink[] m_sinks = new Sink[] { m_queue };
public CommandManager()
{
}
- /*
- public CommandManager( ThreadManager threadManager )
- {
- }
- */
-
public final Queue getCommandQueue()
{
return m_queue;
@@ -42,35 +51,175 @@
public final void registerSignalHandler( Signal signal, EventHandler
handler )
{
+ try
+ {
+ m_mutex.acquire();
+ ArrayList handlers = (ArrayList) m_signalHandlers.get(
signal.getClass() );
+
+ if ( null == handlers )
+ {
+ handlers = new ArrayList();
+ }
+
+ if ( ! handlers.contains( handler ) )
+ {
+ handlers.add( handler );
+
+ m_signalHandlers.put( signal.getClass(), handlers );
+ }
+ }
+ catch (InterruptedException ie)
+ {
+ // ignore for now
+ }
+ finally
+ {
+ m_mutex.release();
+ }
}
public final void deregisterSignalHandler( Signal signal, EventHandler
handler )
{
- }
+ try
+ {
+ m_mutex.acquire();
+ ArrayList handlers = (ArrayList) m_signalHandlers.get(
signal.getClass() );
- public final void handleEvents( QueueElement[] elements )
- {
- for (int i = 0; i < elements.length; i++)
+ if ( null != handlers )
+ {
+ if ( handlers.remove( handler ) )
+ {
+ m_signalHandlers.put( signal.getClass(), handlers );
+ }
+
+ if ( 0 == handlers.size() )
+ {
+ m_signalHandlers.remove( signal.getClass() );
+ }
+ }
+ }
+ catch (InterruptedException ie)
{
- handleEvent( elements[i] );
+ // ignore for now
}
+ finally
+ {
+ m_mutex.release();
+ }
+ }
+
+ public final Sink[] getSinks()
+ {
+ return m_sinks;
}
- public final void handleEvent( QueueElement element )
+ public final EventHandler getEventHandler()
{
+ return m_eventHandler;
}
- private final static class Runner implements Runnable
+ private final static class CommandEventHandler implements EventHandler
{
- private final Queue m_queue;
+ private final Map m_signalHandlers;
+ private final Buffer m_delayedCommands = new VariableSizeBuffer();
+
+ protected CommandEventHandler( Map signalHandlers )
+ {
+ m_signalHandlers = signalHandlers;
+ }
- private Runner( Queue queue )
+ public final void handleEvents( QueueElement[] elements )
{
- m_queue = queue;
+ for (int i = 0; i < elements.length; i++)
+ {
+ handleEvent( elements[i] );
+ }
+
+ int size = m_delayedCommands.size();
+ for (int i = 0; i < size; i++)
+ {
+ DelayedCommandInfo command = (DelayedCommandInfo)
m_delayedCommands.remove();
+
+ if ( System.currentTimeMillis() >= command.m_nextRunTime )
+ {
+ try
+ {
+ command.m_command.execute();
+ } catch (Exception e)
+ {
+ // ignore for now
+ }
+
+ command.m_numExecutions++;
+
+ if ( command.m_repeatable )
+ {
+ RepeatedCommand cmd = (RepeatedCommand)
command.m_command;
+
+ if ( cmd.getNumberOfRepeats() <
command.m_numExecutions )
+ {
+ command.m_nextRunTime =
System.currentTimeMillis() +
+ cmd.getRepeatInterval();
+ m_delayedCommands.add( command );
+ }
+ }
+ }
+ }
}
- public void run()
+ public final void handleEvent( QueueElement element )
{
+ if ( ! ( element instanceof Signal ) )
+ {
+ return;
+ }
+
+ if ( ! ( element instanceof Command ) )
+ {
+ ArrayList handlers = (ArrayList) m_signalHandlers.get(
element.getClass() );
+
+ if ( null != handlers )
+ {
+ Iterator i = handlers.iterator();
+
+ while ( i.hasNext() )
+ {
+ EventHandler handler = (EventHandler) i.next();
+ handler.handleEvent( element );
+ }
+ }
+
+ return;
+ }
+
+ if ( element instanceof DelayedCommand )
+ {
+ DelayedCommandInfo commandInfo = new DelayedCommandInfo();
+ commandInfo.m_command = (DelayedCommand) element;
+ commandInfo.m_nextRunTime = System.currentTimeMillis() +
+ commandInfo.m_command.getDelayInterval();
+ commandInfo.m_numExecutions = 0;
+ commandInfo.m_repeatable = element instanceof
RepeatedCommand;
+
+ m_delayedCommands.add( commandInfo );
+ return;
+ }
+
+ try
+ {
+ ((Command) element).execute();
+ } catch (Exception e)
+ {
+ // ignore for now
+ }
}
+ }
+
+ private final static class DelayedCommandInfo
+ {
+ protected DelayedCommand m_command;
+ protected long m_nextRunTime;
+ protected int m_numExecutions;
+ protected boolean m_repeatable;
}
}
1.1
jakarta-avalon-excalibur/src/scratchpad/org/apache/avalon/excalibur/command/EventPipeline.java
Index: EventPipeline.java
===================================================================
/*
* Copyright (C) The Apache Software Foundation. All rights reserved.
*
* This software is published under the terms of the Apache Software License
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE.txt file.
*/
package org.apache.avalon.excalibur.command;
import org.apache.avalon.excalibur.event.Sink;
import org.apache.avalon.excalibur.event.EventHandler;
/**
* An EventPipeline is used by the ThreadManager to manage the event Queue and
* EventHandler relationship. The ThreadManager manages the automatic
forwarding
* of the Events from the queue to the Event Handler.
*
* @author <a href="mailto:[EMAIL PROTECTED]">Berin Loritsch</a>
*/
public interface EventPipeline
{
/**
* There can be many different sinks to merge into a pipeline. For the
* CommandManager, there is only one sink.
*/
Sink[] getSinks();
/**
* Returns the reference to the EventHandler that the events from all the
* Sinks get merged into.
*/
EventHandler getEventHandler();
}
1.1
jakarta-avalon-excalibur/src/scratchpad/org/apache/avalon/excalibur/command/TPCThreadManager.java
Index: TPCThreadManager.java
===================================================================
/*
* Copyright (C) The Apache Software Foundation. All rights reserved.
*
* This software is published under the terms of the Apache Software License
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE.txt file.
*/
package org.apache.avalon.excalibur.command;
import org.apache.avalon.framework.parameters.Parameters;
import org.apache.avalon.excalibur.concurrent.Mutex;
import org.apache.avalon.excalibur.thread.*;
import org.apache.avalon.excalibur.thread.impl.ResourceLimitingThreadPool;
import org.apache.avalon.excalibur.event.Sink;
import org.apache.avalon.excalibur.event.EventHandler;
import java.util.HashSet;
import java.util.HashMap;
import java.util.Iterator;
/**
* This is a ThreadManager that uses a certain number of threads per
processor.
* The number of threads in the pool is a direct proportion to the number of
* processors.
*
* @author <a href="mailto:[EMAIL PROTECTED]">Berin Loritsch</a>
*/
public final class TPCThreadManager implements Runnable
{
private final ThreadPool m_threadPool;
private final Mutex m_mutex = new Mutex();
private final HashMap m_pipelines = new HashMap();
private ThreadControl m_threadControl;
private boolean m_done = false;
private final long m_sleepTime;
/**
* The default constructor assumes there is a system property named
"os.arch.cpus"
* that has a default for the number of CPUs on a system. Otherwise, the
value
* is 1.
*/
public TPCThreadManager()
{
this( Integer.parseInt( System.getProperty( "os.arch.cpus", "1" ) ) ,
1 );
}
/**
* This constructor assumes there is a parameter named "os.arch.cpus"
* that has a default for the number of CPUs on a system. Otherwise, the
value
* is 1.
*/
public TPCThreadManager(Parameters params)
{
this( params.getParameterAsInteger( "os.arch.cpus", 1 ) , 1 );
}
/**
* Constructor provides one thread per number of processors.
*/
public TPCThreadManager( int numProcessors )
{
this( numProcessors, 1 );
}
/**
* Constructor provides a specified number of threads per processor. If
* either value is less then one, then the value is rewritten as one.
*/
public TPCThreadManager( int numProcessors, int threadsPerProcessor )
{
this( numProcessors, threadsPerProcessor, 1000 );
}
/**
* Constructor provides a specified number of threads per processor. If
* either value is less then one, then the value is rewritten as one.
*/
public TPCThreadManager( int numProcessors, int threadsPerProcessor, long
sleepTime )
{
int processors = Math.max( numProcessors, 1 );
int threads = Math.max( threadsPerProcessor, 1 );
m_threadPool = new ResourceLimitingThreadPool( "TPCThreadManager",
( processors * threads ) + 1, true, true, 1000L, 10L * 1000L
);
m_sleepTime = sleepTime;
m_threadControl = m_threadPool.execute( this );
}
/**
* Register an EventPipeline with the ThreadManager.
*/
void register( EventPipeline pipeline )
{
try
{
m_mutex.acquire();
m_pipelines.put( pipeline, new PipelineRunner( pipeline ) );
if ( m_done )
{
m_threadControl = m_threadPool.execute( this );
}
}
catch ( InterruptedException ie )
{
// ignore for now
}
finally
{
m_mutex.release();
}
}
/**
* Deregister an EventPipeline with the ThreadManager
*/
void deregister( EventPipeline pipeline )
{
try
{
m_mutex.acquire();
m_pipelines.remove( pipeline );
if ( m_pipelines.isEmpty() )
{
m_done = true;
m_threadControl.join( 1000 );
}
}
catch ( InterruptedException ie )
{
// ignore for now
}
finally
{
m_mutex.release();
}
}
/**
* Deregisters all EventPipelines from this ThreadManager
*/
void deregisterAll()
{
try
{
m_mutex.acquire();
m_done = true;
m_pipelines.clear();
m_threadControl.join( 1000 );
}
catch ( InterruptedException ie )
{
// ignore for now
}
finally
{
m_mutex.release();
}
}
public void run()
{
while ( ! m_done )
{
try
{
m_mutex.acquire();
Iterator i = m_pipelines.values().iterator();
while ( i.hasNext() )
{
m_threadPool.execute( (PipelineRunner) i.next() );
}
}
catch ( InterruptedException ie )
{
// ignore for now
}
finally
{
m_mutex.release();
}
try
{
Thread.sleep( m_sleepTime );
}
catch ( InterruptedException ie )
{
// ignore and continue processing
}
}
}
public final static class PipelineRunner implements Runnable
{
private final EventPipeline m_pipeline;
protected PipelineRunner( EventPipeline pipeline )
{
m_pipeline = pipeline;
}
public void run()
{
Sink[] sinks = m_pipeline.getSinks();
EventHandler handler = m_pipeline.getEventHandler();
for (int i = 0; i < sinks.length; i++)
{
handler.handleEvents( sinks[i].dequeueAll() );
}
}
}
}
1.1
jakarta-avalon-excalibur/src/scratchpad/org/apache/avalon/excalibur/command/TPSPThreadManager.java
Index: TPSPThreadManager.java
===================================================================
/*
* Copyright (C) The Apache Software Foundation. All rights reserved.
*
* This software is published under the terms of the Apache Software License
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE.txt file.
*/
package org.apache.avalon.excalibur.command;
import org.apache.avalon.framework.parameters.Parameters;
import org.apache.avalon.excalibur.concurrent.Mutex;
import org.apache.avalon.excalibur.thread.*;
import org.apache.avalon.excalibur.thread.impl.ResourceLimitingThreadPool;
import org.apache.avalon.excalibur.event.Sink;
import org.apache.avalon.excalibur.event.EventHandler;
import java.util.HashSet;
import java.util.HashMap;
import java.util.Iterator;
/**
* This is a ThreadManager which provides a threadpool per Sink per
EventPipeline.
*
* @author <a href="mailto:[EMAIL PROTECTED]">Berin Loritsch</a>
*/
public final class TPSPThreadManager implements Runnable
{
private final ThreadPool m_threadPool;
private final Mutex m_mutex = new Mutex();
private final HashMap m_pipelines = new HashMap();
private ThreadControl m_threadControl;
private boolean m_done = false;
private final long m_sleepTime;
/**
* The default constructor assumes there is a system property named
"os.arch.cpus"
* that has a default for the number of CPUs on a system. Otherwise, the
value
* is 1.
*/
public TPSPThreadManager()
{
this ( 1, 1, 1000 );
}
/**
* Constructor provides a specified number of threads per processor. If
* either value is less then one, then the value is rewritten as one.
*/
public TPSPThreadManager( int numProcessors, int threadsPerProcessor,
long sleepTime )
{
int processors = Math.max( numProcessors, 1 );
int threads = Math.max( threadsPerProcessor, 1 );
m_threadPool = new ResourceLimitingThreadPool( "TPCThreadManager",
( processors * threads ) + 1, true, true, 1000L, 10L * 1000L
);
m_sleepTime = sleepTime;
m_threadControl = m_threadPool.execute( this );
}
/**
* Register an EventPipeline with the ThreadManager.
*/
void register( EventPipeline pipeline )
{
try
{
m_mutex.acquire();
m_pipelines.put( pipeline, new PipelineRunner( pipeline ) );
if ( m_done )
{
m_threadControl = m_threadPool.execute( this );
}
}
catch ( InterruptedException ie )
{
// ignore for now
}
finally
{
m_mutex.release();
}
}
/**
* Deregister an EventPipeline with the ThreadManager
*/
void deregister( EventPipeline pipeline )
{
try
{
m_mutex.acquire();
m_pipelines.remove( pipeline );
if ( m_pipelines.isEmpty() )
{
m_done = true;
m_threadControl.join( 1000 );
}
}
catch ( InterruptedException ie )
{
// ignore for now
}
finally
{
m_mutex.release();
}
}
/**
* Deregisters all EventPipelines from this ThreadManager
*/
void deregisterAll()
{
try
{
m_mutex.acquire();
m_done = true;
m_pipelines.clear();
m_threadControl.join( 1000 );
}
catch ( InterruptedException ie )
{
// ignore for now
}
finally
{
m_mutex.release();
}
}
public void run()
{
while ( ! m_done )
{
try
{
m_mutex.acquire();
Iterator i = m_pipelines.values().iterator();
while ( i.hasNext() )
{
m_threadPool.execute( (PipelineRunner) i.next() );
}
}
catch ( InterruptedException ie )
{
// ignore for now
}
finally
{
m_mutex.release();
}
try
{
Thread.sleep( m_sleepTime );
}
catch ( InterruptedException ie )
{
// ignore and continue processing
}
}
}
public final static class PipelineRunner implements Runnable
{
private final EventPipeline m_pipeline;
protected PipelineRunner( EventPipeline pipeline )
{
m_pipeline = pipeline;
}
public void run()
{
Sink[] sinks = m_pipeline.getSinks();
EventHandler handler = m_pipeline.getEventHandler();
for (int i = 0; i < sinks.length; i++)
{
handler.handleEvents( sinks[i].dequeueAll() );
}
}
}
}
1.1
jakarta-avalon-excalibur/src/scratchpad/org/apache/avalon/excalibur/command/ThreadManager.java
Index: ThreadManager.java
===================================================================
/*
* Copyright (C) The Apache Software Foundation. All rights reserved.
*
* This software is published under the terms of the Apache Software License
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE.txt file.
*/
package org.apache.avalon.excalibur.command;
/**
* A ThreadManager handles the thread policies for EventPipelines. It works
* hand in hand with the CommandManager, and can be expanded to work with a
* SEDA like architecture.
*
* @author <a href="mailto:[EMAIL PROTECTED]">Berin Loritsch</a>
*/
public interface ThreadManager
{
/**
* Register an EventPipeline with the ThreadManager.
*/
void register( EventPipeline pipeline );
/**
* Deregister an EventPipeline with the ThreadManager
*/
void deregister( EventPipeline pipeline );
/**
* Deregisters all EventPipelines from this ThreadManager
*/
void deregisterAll();
}
--
To unsubscribe, e-mail: <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>