Author: dblevins
Date: Fri Jun 4 23:07:20 2010
New Revision: 951611
URL: http://svn.apache.org/viewvc?rev=951611&view=rev
Log:
OPENEJB-1292: Client Failover on connection pool timeout
OPENEJB-1293: Conditional Client Failover based on container or bean thrown
Exception types
Added:
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/RetryException.java
(with props)
openejb/trunk/openejb3/server/openejb-ejbd/src/test/java/org/apache/openejb/server/ejbd/FullPoolFailoverTest.java
- copied, changed from r950801,
openejb/trunk/openejb3/server/openejb-ejbd/src/test/java/org/apache/openejb/server/ejbd/MultithreadTest.java
Modified:
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/util/CountingLatch.java
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/Client.java
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionPoolTimeoutException.java
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/SocketConnectionFactory.java
openejb/trunk/openejb3/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
Modified:
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/util/CountingLatch.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/util/CountingLatch.java?rev=951611&r1=951610&r2=951611&view=diff
==============================================================================
---
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/util/CountingLatch.java
(original)
+++
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/util/CountingLatch.java
Fri Jun 4 23:07:20 2010
@@ -16,11 +16,7 @@
*/
package org.apache.openejb.util;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.TimeUnit;
/**
@@ -30,8 +26,12 @@ public class CountingLatch {
private final Sync sync;
- public CountingLatch() {
- this.sync = new Sync();
+ public CountingLatch() {
+ this(0);
+ }
+
+ public CountingLatch(int count) {
+ this.sync = new Sync(count);
}
public void await() throws InterruptedException {
@@ -55,8 +55,8 @@ public class CountingLatch {
}
private static final class Sync extends AbstractQueuedSynchronizer {
- private Sync() {
- setState(0);
+ private Sync(int count) {
+ setState(count);
}
public boolean tryReleaseShared(int releases) {
Modified:
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/Client.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/Client.java?rev=951611&r1=951610&r2=951611&view=diff
==============================================================================
---
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/Client.java
(original)
+++
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/Client.java
Fri Jun 4 23:07:20 2010
@@ -18,6 +18,7 @@ package org.apache.openejb.client;
import static org.apache.openejb.client.Exceptions.newIOException;
+import javax.ejb.ConcurrentAccessTimeoutException;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
@@ -29,7 +30,9 @@ import java.rmi.RemoteException;
import java.util.Map;
import java.util.Set;
import java.util.HashSet;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.net.URI;
@@ -41,14 +44,23 @@ public class Client {
private static final ProtocolMetaData PROTOCOL_VERSION = new
ProtocolMetaData("3.1");
+ private List<Class<? extends Throwable>> retryConditions = new
CopyOnWriteArrayList();
private static Client client = new Client();
private boolean retry = false;
public Client() {
- String retryValue = System.getProperty("openejb.client.requestretry",
retry + "");
+ String retryValue = System.getProperty("openejb.client.requestretry",
getRetry() + "");
retry = new Boolean(retryValue);
}
+ public static boolean addRetryCondition(Class<? extends Throwable>
throwable) {
+ return client.retryConditions.add(throwable);
+ }
+
+ public static boolean removeRetryCondition(Class<? extends Throwable>
throwable) {
+ return client.retryConditions.remove(throwable);
+ }
+
// This lame hook point if only of testing
public static void setClient(Client client) {
Client.client = client;
@@ -249,18 +261,32 @@ public class Client {
throw new RemoteException("Error reading response from server
(" + protocolMetaData.getSpec() + ") : " + e.getMessage(), e);
}
+ if (retryConditions.size() > 0) {
+ if (res instanceof EJBResponse) {
+ EJBResponse ejbResponse = (EJBResponse) res;
+ if (ejbResponse.getResult() instanceof ThrowableArtifact) {
+ ThrowableArtifact artifact = (ThrowableArtifact)
ejbResponse.getResult();
+ if
(retryConditions.contains(artifact.getThrowable().getClass())) {
+ throw new RetryException(res);
+ }
+ }
+ }
+ }
} catch (RemoteException e) {
throw e;
} catch (IOException e){
Set<URI> failed = getFailed();
failed.add(conn.getURI());
conn.discard();
- if (retry){
+ if (e instanceof RetryException || getRetry()){
try {
processRequest(req, res, server);
} catch (RemoteFailoverException re) {
throw re;
} catch (RemoteException re) {
+ if (e instanceof RetryException) {
+ return ((RetryException) e).getResponse();
+ }
throw new RemoteFailoverException("Cannot complete
request. Retry attempted on " + failed.size() + " servers", e);
}
}
@@ -304,4 +330,7 @@ public class Client {
return cluster;
}
+ private boolean getRetry() {
+ return retry = new
Boolean(System.getProperty("openejb.client.requestretry", retry + ""));
+ }
}
Modified:
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionPoolTimeoutException.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionPoolTimeoutException.java?rev=951611&r1=951610&r2=951611&view=diff
==============================================================================
---
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionPoolTimeoutException.java
(original)
+++
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionPoolTimeoutException.java
Fri Jun 4 23:07:20 2010
@@ -16,12 +16,12 @@
*/
package org.apache.openejb.client;
-import javax.ejb.ConcurrentAccessException;
+import java.io.IOException;
/**
* @version $Rev$ $Date$
*/
-public class ConnectionPoolTimeoutException extends ConcurrentAccessException {
+public class ConnectionPoolTimeoutException extends IOException {
public ConnectionPoolTimeoutException() {
}
Added:
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/RetryException.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/RetryException.java?rev=951611&view=auto
==============================================================================
---
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/RetryException.java
(added)
+++
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/RetryException.java
Fri Jun 4 23:07:20 2010
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.client;
+
+import java.io.IOException;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class RetryException extends IOException {
+ private final Response response;
+
+ public RetryException(Response response) {
+ this.response = response;
+ }
+
+ public Response getResponse() {
+ return response;
+ }
+}
Propchange:
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/RetryException.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/SocketConnectionFactory.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/SocketConnectionFactory.java?rev=951611&r1=951610&r2=951611&view=diff
==============================================================================
---
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/SocketConnectionFactory.java
(original)
+++
openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/SocketConnectionFactory.java
Fri Jun 4 23:07:20 2010
@@ -139,7 +139,7 @@ public class SocketConnectionFactory imp
private Pool getPool(URI uri) {
Pool pool = connections.get(uri);
if (pool == null) {
- pool = new Pool(getSize(), getTimeout());
+ pool = new Pool(uri, getSize(), getTimeout());
connections.put(uri, pool);
}
return pool;
@@ -287,14 +287,16 @@ public class SocketConnectionFactory imp
}
}
- public static class Pool {
+ private static class Pool {
private final Semaphore semaphore;
private final Stack<SocketConnection> pool;
private final long timeout;
private final TimeUnit timeUnit;
private final int size;
+ private final URI uri;
- public Pool(int size, long timeout) {
+ private Pool(URI uri, int size, long timeout) {
+ this.uri = uri;
this.size = size;
this.semaphore = new Semaphore(size);
this.pool = new Stack<SocketConnection>();
@@ -307,7 +309,7 @@ public class SocketConnectionFactory imp
}
}
- public SocketConnection get() {
+ public SocketConnection get() throws IOException{
try {
if (semaphore.tryAcquire(timeout, timeUnit)) {
return pool.pop();
@@ -323,5 +325,14 @@ public class SocketConnectionFactory imp
pool.push(connection);
semaphore.release();
}
+
+ @Override
+ public String toString() {
+ return "Pool{" +
+ "size=" + size +
+ ", available=" + semaphore.availablePermits() +
+ ", uri=" + uri +
+ '}';
+ }
}
}
Modified:
openejb/trunk/openejb3/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java?rev=951611&r1=951610&r2=951611&view=diff
==============================================================================
---
openejb/trunk/openejb3/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
(original)
+++
openejb/trunk/openejb3/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
Fri Jun 4 23:07:20 2010
@@ -132,7 +132,7 @@ public class KeepAliveServer implements
session.usage.unlock();
}
} else {
- logger.info("Allowing graceful shutdown of " +
session.socket.getInetAddress());
+ logger.debug("Allowing graceful shutdown of " +
session.socket.getInetAddress());
}
}
}
Copied:
openejb/trunk/openejb3/server/openejb-ejbd/src/test/java/org/apache/openejb/server/ejbd/FullPoolFailoverTest.java
(from r950801,
openejb/trunk/openejb3/server/openejb-ejbd/src/test/java/org/apache/openejb/server/ejbd/MultithreadTest.java)
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-ejbd/src/test/java/org/apache/openejb/server/ejbd/FullPoolFailoverTest.java?p2=openejb/trunk/openejb3/server/openejb-ejbd/src/test/java/org/apache/openejb/server/ejbd/FullPoolFailoverTest.java&p1=openejb/trunk/openejb3/server/openejb-ejbd/src/test/java/org/apache/openejb/server/ejbd/MultithreadTest.java&r1=950801&r2=951611&rev=951611&view=diff
==============================================================================
---
openejb/trunk/openejb3/server/openejb-ejbd/src/test/java/org/apache/openejb/server/ejbd/MultithreadTest.java
(original)
+++
openejb/trunk/openejb3/server/openejb-ejbd/src/test/java/org/apache/openejb/server/ejbd/FullPoolFailoverTest.java
Fri Jun 4 23:07:20 2010
@@ -19,6 +19,8 @@ package org.apache.openejb.server.ejbd;
import junit.framework.TestCase;
import org.apache.openejb.OpenEJB;
import org.apache.openejb.client.ConnectionPoolTimeoutException;
+import org.apache.openejb.client.Client;
+import org.apache.openejb.util.CountingLatch;
import org.apache.openejb.assembler.classic.Assembler;
import org.apache.openejb.assembler.classic.StatelessSessionContainerInfo;
import org.apache.openejb.config.ConfigurationFactory;
@@ -28,183 +30,212 @@ import org.apache.openejb.jee.StatelessB
import org.apache.openejb.loader.SystemInstance;
import org.apache.openejb.server.ServiceDaemon;
import org.apache.openejb.server.ServicePool;
+import org.apache.openejb.server.ServerServiceFilter;
+import org.apache.openejb.server.ServerService;
+import org.apache.openejb.server.ServiceException;
import javax.ejb.Remote;
import javax.ejb.Stateless;
-import javax.ejb.ConcurrentAccessException;
+import javax.ejb.ConcurrentAccessTimeoutException;
import javax.naming.Context;
import javax.naming.InitialContext;
-import javax.naming.NamingException;
import java.util.Properties;
+import java.util.List;
+import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.net.URI;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
/**
* @version $Rev$ $Date$
*/
-public class MultithreadTest extends TestCase {
+public class FullPoolFailoverTest extends TestCase {
- private ServiceDaemon serviceDaemon;
private Counter counter;
- private static CountDownLatch startPistol;
- private static CountDownLatch startingLine;
- private static CountDownLatch invocations;
-
- public void testStatelessBeanPooling() throws Exception {
-
- startPistol = new CountDownLatch(1);
- startingLine = new CountDownLatch(10);
- final CountDownLatch finishingLine = new CountDownLatch(30);
+ private static CountDownLatch resume;
+ private static CountingLatch paused;
+ private static final URI red = URI.create("red");
+ private static final URI blue = URI.create("blue");
+
+ public static List<URI> hits = new ArrayList<URI>();
+ public static List<URI> hold = new ArrayList<URI>();
+
+ public void testStatelessBeanTimeout() throws Exception {
+
+ setup(10, 20);
+
+ Client.addRetryCondition(ConcurrentAccessTimeoutException.class);
+
+ resume = new CountDownLatch(1);
+ paused = new CountingLatch(10);
// Do a business method...
- Runnable r = new Runnable() {
- public void run() {
- counter.race();
- finishingLine.countDown();
- }
+ Runnable r = new Runnable(){
+ public void run(){
+ counter.hit();
+ }
};
- // -- READY --
-
- // How much ever the no of client invocations the count should be 10
as only 10 instances will be created.
- for (int i = 0; i < 30; i++) {
+ hold.add(red);
+
+ for (int i = 0; i < 10; i++) {
Thread t = new Thread(r);
t.start();
}
- // Wait for the beans to reach the finish line
- startingLine.await(1000, TimeUnit.MILLISECONDS);
-
- // -- SET --
-
- assertEquals(10, CounterBean.instances.get());
-
- // -- GO --
-
- startPistol.countDown(); // go
-
- finishingLine.await(1000, TimeUnit.MILLISECONDS);
-
- // -- DONE --
+ // Wait for the beans to reach the start line
+ assertTrue("expected 10 invocations", paused.await(3000,
TimeUnit.MILLISECONDS));
assertEquals(10, CounterBean.instances.get());
+ assertEquals(10, hits.size());
- }
-
- public void testStatelessBeanRelease() throws Exception {
+ List<URI> expected = new ArrayList<URI>();
+ for (int i = 0; i < 10; i++) expected.add(red);
- invocations = new CountDownLatch(30);
+ assertEquals(expected, hits);
- // Do a business method...
- Runnable r = new Runnable(){
- public void run(){
- try{
- counter.explode();
- }catch(Exception e){
-
- }
- }
- };
-
- // -- READY --
-
- // 30 instances should be created and discarded.
- for (int i = 0; i < 30; i++) {
- Thread t = new Thread(r);
- t.start();
+ // This one should failover to the blue server
+ try {
+ counter.hit();
+ fail("Expected ConcurrentAccessTimeoutException");
+ } catch (ConcurrentAccessTimeoutException e) {
+ // both "red" and "blue" servers are technically using the
+ // same stateless session bean pool, which is fully busy
+ // but ... this exception should have come from the "blue" server
}
- boolean success = invocations.await(10000, TimeUnit.MILLISECONDS);
+ // one more hit on red that should have failed over to blue
+ expected.add(red);
+ expected.add(blue);
+
+ assertEquals(expected, hits);
+
+ // A second invoke on this should now have using talking to blue
+ // then it should fail back to red
+ try {
+ counter.hit();
+ fail("Expected ConcurrentAccessTimeoutException");
+ } catch (ConcurrentAccessTimeoutException e) {
+ }
- assertTrue(success);
+ expected.add(blue);
+ expected.add(red);
- assertEquals(30, CounterBean.discardedInstances.get());
+ assertEquals(expected, hits);
+ resume.countDown(); // go
}
+ public void testConnectionPoolTimeout() throws Exception {
- public void testStatelessBeanTimeout() throws Exception {
+ setup(30, 10);
- final CountDownLatch timeouts = new CountDownLatch(10);
- startPistol = new CountDownLatch(1);
- startingLine = new CountDownLatch(10);
+ resume = new CountDownLatch(1);
+
+ // This is used to cause invoking threads to pause
+ // so all pools can be depleted
+ paused = new CountingLatch(10);
// Do a business method...
Runnable r = new Runnable(){
public void run(){
- try{
- counter.race();
- }catch (ConcurrentAccessException ex){
- comment("Leap Start");
- timeouts.countDown();
- assertEquals("An invocation of the Stateless Session Bean
CounterBean has timed-out", ex.getMessage());
- } catch (Throwable t) {
- fail("Unexpected exception" + t.getClass().getName() + " "
+ t.getMessage());
- }
+ counter.hit();
}
};
+ hold.add(red);
- comment("On your mark!");
+ List<URI> expected = new ArrayList<URI>();
- for (int i = 0; i < 20; i++) {
+ for (int i = 0; i < 10; i++) {
+ expected.add(red);
Thread t = new Thread(r);
t.start();
}
// Wait for the beans to reach the start line
- assertTrue("expected 10 invocations", startingLine.await(3000,
TimeUnit.MILLISECONDS));
+ assertTrue("expected 10 invocations", paused.await(3000,
TimeUnit.MILLISECONDS));
- comment("Get Set!");
+ assertEquals(10, CounterBean.instances.get());
+ assertEquals(10, hits.size());
+ assertEquals(expected, hits);
- // Wait for the other beans timeout
- assertTrue("expected 10 timeouts", timeouts.await(300000,
TimeUnit.MILLISECONDS));
+ // This one should failover to the blue server
+ URI uri = counter.hit();
+ assertEquals(blue, uri);
+
+ // the red pool is fully busy, so we should have failed over to blue
+ expected.add(blue);
+ assertEquals(expected, hits);
+
+ // Now hold blue as well
+ hold.add(blue);
+
+ for (int i = 0; i < 10; i++) {
+ paused.countUp();
+ expected.add(blue);
+ Thread t = new Thread(r);
+ t.start();
+ }
- assertEquals(10, CounterBean.instances.get());
+ // Wait for the beans to reach the start line
+ assertTrue("expected 20 invocations", paused.await(3000,
TimeUnit.MILLISECONDS));
+
+ // The extra 10 invocations should all have been on the "blue" server
+ assertEquals(expected, hits);
+
+ // A second invoke on this should now have using talking to blue
+ // then it should fail back to red
+ try {
+ counter.hit();
+ fail("Expected javax.ejb.EJBException");
+ } catch (javax.ejb.EJBException e) {
+ }
- comment("Go!");
+ // there should be no hits on any server, both connection pools are
fully busy
+ assertEquals(expected, hits);
- startPistol.countDown(); // go
+ resume.countDown(); // go
}
+
@Override
protected void tearDown() throws Exception {
super.tearDown();
- serviceDaemon.stop();
- OpenEJB.destroy();
- }
- protected void setUp() throws Exception {
- super.setUp();
+ for (ServiceDaemon daemon : daemons) daemon.stop();
- int poolSize = 10;
-
- System.setProperty("openejb.client.connectionpool.size", "" +
(poolSize*2));
+ OpenEJB.destroy();
+ }
- EjbServer ejbServer = new EjbServer();
- KeepAliveServer keepAliveServer = new KeepAliveServer(ejbServer);
+ private final List<ServiceDaemon> daemons = new ArrayList<ServiceDaemon>();
+ protected void setup(int statelessPoolSize, int connectionPoolSize) throws
Exception {
Properties initProps = new Properties();
initProps.setProperty("openejb.deployments.classpath.include", "");
initProps.setProperty("openejb.deployments.classpath.filter.descriptors",
"true");
OpenEJB.init(initProps, new ServerFederation());
- ejbServer.init(new Properties());
- ServicePool pool = new ServicePool(keepAliveServer, "ejbd",
(poolSize*2));
- this.serviceDaemon = new ServiceDaemon(pool, 0, "localhost");
- serviceDaemon.start();
+ System.setProperty("openejb.client.connectionpool.size", "" +
connectionPoolSize);
- int port = serviceDaemon.getPort();
+ EjbServer ejbServer = new EjbServer();
+ ejbServer.init(new Properties());
+ daemons.add(createServiceDaemon(connectionPoolSize, ejbServer, red));
+ daemons.add(createServiceDaemon(connectionPoolSize, ejbServer, blue));
+
ConfigurationFactory config = new ConfigurationFactory();
Assembler assembler =
SystemInstance.get().getComponent(Assembler.class);
// containers
StatelessSessionContainerInfo statelessContainerInfo =
config.configureService(StatelessSessionContainerInfo.class);
statelessContainerInfo.properties.setProperty("TimeOut", "100");
- statelessContainerInfo.properties.setProperty("PoolSize", "" +
poolSize);
+ statelessContainerInfo.properties.setProperty("PoolSize", "" +
statelessPoolSize);
statelessContainerInfo.properties.setProperty("PoolMin", "2");
statelessContainerInfo.properties.setProperty("StrictPooling", "true");
assembler.createContainer(statelessContainerInfo);
@@ -223,14 +254,54 @@ public class MultithreadTest extends Tes
CounterBean.instances.set(0);
assembler.createApplication(config.configureApplication(ejbJar));
+ String failoverURI = "failover:sticky:";
+ failoverURI += "ejbd://127.0.0.1:" + daemons.get(0).getPort() +
"?red,";
+ failoverURI += "ejbd://127.0.0.1:" + daemons.get(1).getPort() +
"?blue";
+
Properties props = new Properties();
props.put("java.naming.factory.initial",
"org.apache.openejb.client.RemoteInitialContextFactory");
- props.put("java.naming.provider.url", "ejbd://127.0.0.1:" + port);
+ props.put("java.naming.provider.url", failoverURI);
Context context = new InitialContext(props);
counter = (Counter) context.lookup("CounterBeanRemote");
+
+ hold.clear();
+ hits.clear();
+ }
+
+ private ServiceDaemon createServiceDaemon(int poolSize, EjbServer
ejbServer, URI uri) throws ServiceException {
+ ServiceIdentifier serviceIdentifier = new ServiceIdentifier(ejbServer,
uri);
+ KeepAliveServer keepAliveServer = new
KeepAliveServer(serviceIdentifier);
+ ServicePool pool = new ServicePool(keepAliveServer, "ejbd", poolSize);
+ ServiceDaemon daemon = new ServiceDaemon(pool, 0, "localhost");
+ daemon.start();
+ return daemon;
}
+ public static ThreadLocal<URI> host = new ThreadLocal<URI>();
+
+ public static class ServiceIdentifier extends ServerServiceFilter {
+ private final URI me;
+
+ public ServiceIdentifier(ServerService service, URI me) {
+ super(service);
+ this.me = me;
+ }
+
+ @Override
+ public void service(InputStream in, OutputStream out) throws
ServiceException, IOException {
+ synchronized (hits){
+ hits.add(me);
+ }
+ host.set(me);
+ try {
+ super.service(in, out);
+ } finally {
+ host.remove();
+ }
+ }
+ }
+
public static Object lock = new Object[]{};
private static void comment(String x) {
@@ -243,9 +314,7 @@ public class MultithreadTest extends Tes
public static interface Counter {
int count();
- void race();
-
- void explode();
+ URI hit();
}
@Remote
@@ -257,7 +326,6 @@ public class MultithreadTest extends Tes
public static class CounterBean implements Counter, RemoteCounter {
public static AtomicInteger instances = new AtomicInteger();
- public static AtomicInteger discardedInstances = new AtomicInteger();
private int count;
@@ -269,28 +337,19 @@ public class MultithreadTest extends Tes
return instances.get();
}
- public int discardCount() {
- return discardedInstances.get();
- }
+ public URI hit() {
+ URI uri = host.get();
- public void explode() {
- try {
- discardedInstances.incrementAndGet();
- throw new NullPointerException();
- } finally {
- invocations.countDown();
+ if (hold.contains(uri)) {
+ try {
+ paused.countDown();
+ resume.await();
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ }
}
- }
- public void race() {
- comment("ready = " + count);
- startingLine.countDown();
- try {
- startPistol.await();
- comment("running = " + count);
- } catch (InterruptedException e) {
- Thread.interrupted();
- }
+ return uri;
}
public void init() {
@@ -301,4 +360,4 @@ public class MultithreadTest extends Tes
}
}
-}
+}
\ No newline at end of file