Preliminary implementation:
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.sun.jini.thread;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* This Executor does not block on tasks, nor does it grow excessively,
* instead when under peak load, the calling thread executes the task.
*
* It is the callers responsibility to know the sequence that the tasks
* must be executed in.
*
* This Executor sets the executor thread name and ContextClassLoader
based on
* the name argument given to execute and the ContextClassLoader of the
* calling thread, then cleared after execution of the task is complete.
*
* @author peter
*/
final class ImprovedThreadPool implements Executor{
private final ThreadPoolExecutor executor;
/**
*
* @param threadGroup
* @param blocking_coefficient 0 CPU intensive to 0.9 IO intensive.
*/
ImprovedThreadPool(ThreadGroup threadGroup, double
blocking_coefficient) {
if (blocking_coefficient < 0)
throw new IllegalArgumentException("blocking coefficient
can't be less than zero");
if (blocking_coefficient >= 1)
throw new IllegalArgumentException("blocking coefficient can't
be one or greater");
// Blocking coefficient is the number of tasks that might block.
// 0 CPU intensive to 0.9 IO intensive
int numberOfCores = Runtime.getRuntime().availableProcessors();
int poolSizeLimit = (int) (numberOfCores / ( 1 -
blocking_coefficient));
// The intent here is to parallelise security checks as well as weed
// out blocking SocketPermission's to execute them in parallel to
// reduce the wait on network IO.
// Once the pool size has reached it's maximum, the tasks are handed
// back to the calling thread to execute, this strategy also
eliminates
// the possiblity of deadlock caused by circular dependencies
between
// permission checks.
executor =
new ThreadPoolExecutor(numberOfCores, poolSizeLimit, 20L,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new Factory(threadGroup),
new ThreadPoolExecutor.CallerRunsPolicy());
}
public void execute(Runnable runnable, String name) {
throw new UnsupportedOperationException("Not supported yet.");
}
/**
* Task simply encapsulates a task's Runnable object with its name.
*/
private static class Task implements Runnable {
final Runnable runnable;
final SetThreadState taskState;
Task(Runnable runnable, String name) {
this.runnable = runnable;
// caller is null if SecurityException thrown.
ThreadState caller = AccessController.doPrivileged(new
GetThreadState());
ThreadState state = new ThreadState(name, caller != null ?
caller.getContextClassLoader() : null);
taskState = new SetThreadState(state);
}
@Override
public void run() {
ThreadState state = AccessController.doPrivileged(new
GetThreadState());
if ( state == null ) {
runnable.run();
return;
} else {
Boolean success = AccessController.doPrivileged(taskState);
runnable.run();
if (success.booleanValue()) {
AccessController.doPrivileged(new
SetThreadState(state));
}
return;
}
}
}
private static class SetThreadState implements
PrivilegedAction<Boolean>{
private final ThreadState state;
SetThreadState(ThreadState state){
this.state=state;
}
@Override
public Boolean run() {
try {
Thread thread = Thread.currentThread();
thread.setName(state.getName());
thread.setContextClassLoader(state.getContextClassLoader());
} catch (SecurityException e){
return Boolean.FALSE;
}
return Boolean.TRUE;
}
}
private static class GetThreadState implements
PrivilegedAction<ThreadState>{
@Override
public ThreadState run() {
try {
Thread thread = Thread.currentThread();
return new
ThreadState(thread.getName(),thread.getContextClassLoader());
} catch (SecurityException e){
return null;
}
}
}
private static class ThreadState {
private final String name;
private final ClassLoader ccl;
ThreadState(String name, ClassLoader ccl){
this.name=name;
this.ccl=ccl;
}
String getName(){
return name;
}
ClassLoader getContextClassLoader(){
return ccl;
}
}
private static class Factory implements ThreadFactory{
private final ThreadGroup threadGroup;
Factory(ThreadGroup threadGroup){
this.threadGroup=threadGroup;
}
@Override
public Thread newThread(Runnable r) {
return new Thread(threadGroup, r);
}
}
}
Peter Firmstone wrote:
ThreadPool implements the com.sun.jini.thread.Executor interface.
Because the interface states it should not block, I think the calling
thread should execute the task when the thread pool becomes saturated,
rather than continue to create new threads as per the current
implementation. This will ensure that the task is completed, the
calling thread, if it uses a sequence of tasks with dependencies will
have to ensure that it submits the tasks in order. Since only the
caller knows the order, it makes sense for this to be the callers
responsibility. As a result the executor will honor the non blocking
contract. For that reason we'll use a zero length queue, probably a
SynchronousQueue.
Regards,
Peter.
/**
* Executor is an abstraction for a thread factory or thread pool for
* executing actions asynchronously.
*
* @author Sun Microsystems, Inc.
*
*/
public interface Executor {
/**
* Executes the given Runnable action asynchronously in some thread.
*
* The implementation may create a new thread to execute the action,
* or it may execute the action in an existing thread.
*
* The execution of a given action must not be delayed indefinitely
* in order to complete execution of a different action passed to a
* different invocation of this method. In other words, the
* implementation must assume that there may be arbitrary dependencies
* between actions passed to this method, so it needs to be careful
* to avoid potential deadlock by delaying execution of one action
* indefinitely until another completes.
*
* Also, this method itself must not block, because it may be invoked
* by code that is serially processing data to produce multiple such
* arbitrarily-dependent actions that need to be executed.
*
* @param runnable the Runnable action to execute
*
* @param name string to include in the name of the thread used
* to execute the action
*/
void execute(Runnable runnable, String name);
}
Peter Firmstone wrote:
Thanks Brian,
Looking at our implementation code, DGC uses an Executor called
ThreadPool, it's javadoc states:
/**
* ThreadPool is a simple thread pool implementation of the Executor
* interface.
*
* A new task is always given to an idle thread, if one is available;
* otherwise, a new thread is always created. There is no minimum
* warm thread count, nor is there a maximum thread count (tasks are
* never queued unless there are sufficient idle threads to execute
* them).
*
* New threads are created as daemon threads in the thread group that
* was passed to the ThreadPool instance's constructor. Each thread's
* name is the prefix NewThreadAction.NAME_PREFIX followed by the name
* of the task it is currently executing, or "Idle" if it is currently
* idle.
ThreadPool predates Java 5, it looks like we can fix this by using an
Executor from Java 5, we can look at limiting the number of threads
created based on available CPU's and a scaling factor and place the
tasks in a BlockingQueue, so if the queue is filled, it blocks.
Can you report the issue as a Bug on Jira for me, I'll fix this
before the next release.
Regards,
Peter.
Peter Firmstone wrote:
Hi Peter,
I was wondering if you had any thoughts on this post from Bryan on
River users?
Hope you don't mind me asking ;)
Best Regards,
Peter Firmstone.
------------------------------------------------------------------------
Subject:
Re: DGC threads issue
From:
Tom Hobbs <tvho...@googlemail.com>
Date:
Thu, 12 Jan 2012 20:45:01 +0000
To:
u...@river.apache.org, dev@river.apache.org
To:
u...@river.apache.org, dev@river.apache.org
Hi Bryan,
Sorry that no one got back to you about this. I'm afraid that I don't
know the answer to your question, I've copied the dev list into this
email in case someone who monitors that list (but not this one) has
any ideas.
Best regards,
Tom
On Thu, Jan 12, 2012 at 2:29 PM, Bryan Thompson <br...@systap.com>
wrote:
Just to follow up on this thread myself. I modified the pattern to
return a "thick" future rather than a proxy for the future. This
caused the RMI call to wait on the server until the future was done
and then sent back the outcome. This "fixed" the DGC memory/thread
leak by reducing the number of exported proxies drammatically.
In terms of best practices, is distributed DGC simply not useful
for exported objects with short life spans? Can it only be used
with proxies for relatively long lived services?
Thanks,
Bryan
-----Original Message-----
From: Bryan Thompson
Sent: Tuesday, January 03, 2012 12:06 PM
To: u...@river.apache.org
Subject: DGC threads issue
Hello,
Background:
I am seeing what would appear to be one DGC thread allocated
per exported object. This is using River 2.2 and Sun JDK
1.6.0_17. Relevant configuration parameters are below.
I am observing problems with the DGC threads not being
retired on a timely basis. The exported objects are proxies
for Futures which are being executed on the service. The
code pattern is such that the proxied Future goes out of
lexical scope quite quickly. E.g.,
rmiCallReturningProxyForFuture().get().
Under a modest load, a large number of such Futures are
exported which results in a large number of long lived DGC
threads. This turns into a problem for the JVM due to the
stack allocation per thread. Presumably this is not good for
other reasons as well (e.g., scheduling).
I have tried to override the leaseValue and checkInterval
defaults per the configuration options below. I suspect that
the lease interval is somehow not being obeyed, which is
presumably a problem on my end. However, I can verify that
the configuration values are in fact showing up in
System.getProperties() for at least some of the JVMs involved
(the one which drives the workload and the one that I am
monitoring with the large number of DGC lease threads).
Some questions:
Is this one-thread-per-exported proxy the expected behavior
when DGC is requested for the exported object?
The DGC lease checker threads appear to expire ~14 - 15
minutes after I terminate the process which was originating
the RMI requests. This is close the sum of the default
leaseValue (10m) and checkInterval (5m) parameters, but maybe
there is some other timeout which is controlling this? If
this is the sum of those parameters, why would the DGC lease
threads live until the sum of those values? I thought that
the lease would expire after the leaseValue (10m default).
Can the issue I am observing be caused by a low heap pressure
on the JVM to which the RMI proxies were exported? If it
fails to GC those proxies, even though they are reachable,
could that cause DGC to continue to retain those proxies on
the JVM which exported them?
Is there any way to configure DGC to use a thread pool or to
have the leases managed by a single thread?
Is it possible that there is an interaction with the useNIO option?
Relevant options that I am using include:
-Dcom.sun.jini.jeri.tcp.useNIO=true
-Djava.rmi.dgc.leaseValue=30000
-Dsun.rmi.dgc.checkInterval=15000
-Dsun.rmi.transport.tcp.connectionPool=true
Thanks in advance,
Bryan