Modified: openejb/trunk/openejb3/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java?rev=693321&r1=693320&r2=693321&view=diff ============================================================================== --- openejb/trunk/openejb3/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java (original) +++ openejb/trunk/openejb3/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java Mon Sep 8 17:09:25 2008 @@ -16,36 +16,48 @@ */ package org.apache.openejb.server.ejbd; +import org.apache.openejb.loader.SystemInstance; import org.apache.openejb.server.ServerService; import org.apache.openejb.server.ServiceException; import org.apache.openejb.server.ServicePool; -import org.apache.openejb.loader.SystemInstance; +import org.apache.openejb.util.LogCategory; +import org.apache.openejb.util.Logger; +import org.apache.openejb.util.Exceptions; +import org.apache.openejb.client.KeepAliveStyle; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.IOException; -import java.io.InterruptedIOException; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.OutputStream; import java.net.Socket; -import java.util.Properties; +import java.net.SocketException; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.Map; -import java.util.TimerTask; +import java.util.Properties; import java.util.Timer; -import java.util.Date; -import java.util.Collection; +import java.util.TimerTask; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.BlockingQueue; -import java.text.SimpleDateFormat; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * @version $Rev$ $Date$ */ public class KeepAliveServer implements ServerService { + + private static final Logger logger = Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("keepalive"), KeepAliveServer.class); private final ServerService service; private final long timeout = (1000 * 3); - private final KeepAliveTimer keepAliveTimer = new KeepAliveTimer(timeout); + + private final AtomicBoolean stop = new AtomicBoolean(); + private final KeepAliveTimer keepAliveTimer; + private Timer timer; public KeepAliveServer() { this(new EjbServer()); @@ -54,26 +66,27 @@ public KeepAliveServer(ServerService service) { this.service = service; + keepAliveTimer = new KeepAliveTimer(); - Timer timer = new Timer("KeepAliveTimer", true); + timer = new Timer("KeepAliveTimer", true); timer.scheduleAtFixedRate(keepAliveTimer, timeout, timeout / 2); - } + public class KeepAliveTimer extends TimerTask { - public static class KeepAliveTimer extends TimerTask { + // Doesn't need to be a map. Could be a set if Session.equals/hashCode only referenced the Thread. + private final Map<Thread, Session> sessions = new ConcurrentHashMap<Thread, Session>(); - private final Map<Thread, Status> statusMap = new ConcurrentHashMap<Thread, Status>(); - - private final long timeout; private BlockingQueue<Runnable> queue; - public KeepAliveTimer(long timeout) { - this.timeout = timeout; + public void run() { + if (!stop.get()) { + closeInactiveSessions(); + } } - public void run() { + private void closeInactiveSessions() { BlockingQueue<Runnable> queue = getQueue(); if (queue == null) return; @@ -82,28 +95,50 @@ long now = System.currentTimeMillis(); - Collection<Status> statuses = statusMap.values(); - for (Status status : statuses) { + for (Session session : sessions.values()) { -// System.out.println(""+status); - - if (status.isReading() && now - status.getTime() > timeout){ -// System.out.println("Thread Interrupt"); + if (session.usage.tryLock()) { try { - backlog--; - status.in.close(); - } catch (IOException e) { - e.printStackTrace(); + if (now - session.lastRequest > timeout) { + try { + backlog--; + session.socket.close(); + } catch (IOException e) { + logger.info("Error closing socket.", e); + } finally { + removeSession(session); + } + } + } finally { + session.usage.unlock(); } } if (backlog <= 0) return; } -// System.out.println("exit"); + } + + public void closeSessions() { + + // Close the ones we can + for (Session session : sessions.values()) { + if (session.usage.tryLock()) { + try { + session.socket.close(); + } catch (IOException e) { + logger.info("Error closing socket.", e); + } finally { + removeSession(session); + session.usage.unlock(); + } + } else { + logger.info("Allowing graceful shutdown of " + session.socket.getInetAddress()); + } + } } private BlockingQueue<Runnable> getQueue() { - if (queue == null){ + if (queue == null) { // this can be null if timer fires before service is fully initialized ServicePool incoming = SystemInstance.get().getComponent(ServicePool.class); if (incoming == null) return null; @@ -113,79 +148,87 @@ return queue; } - public Status setStatus(Status status) { -// System.out.println("status = " + status); - return statusMap.put(status.getThread(), status); + public Session addSession(Session session) { + return sessions.put(session.thread, session); + } + + public Session removeSession(Session session) { + return sessions.remove(session.thread); } } - public static class Status { - private final long time; - private final boolean reading; - private final Thread thread; - private static final SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss.SSS"); - private final InputStream in; + private class Session { - public boolean isReading() { - return reading; - } + private final Thread thread; + private final Lock usage = new ReentrantLock(); - public Thread getThread() { - return thread; - } + // only used inside the Lock + private long lastRequest; - public long getTime() { - return time; - } + // only used inside the Lock + private final Socket socket; - public Status(boolean reading, InputStream in) { - this.reading = reading; + public Session(Socket socket) { + this.socket = socket; + this.lastRequest = System.currentTimeMillis(); this.thread = Thread.currentThread(); - this.time = System.currentTimeMillis(); - this.in = in; } - public String toString() { - String msg = ""; - if (reading) - msg += "READING"; - else msg += "WORKING"; - msg += " "+thread.getName(); + public void service(Socket socket) throws ServiceException, IOException { + keepAliveTimer.addSession(this); - msg += " since "+ format.format(new Date(time)); - return msg; - } - } + int i = -1; + try { + InputStream in = new BufferedInputStream(socket.getInputStream()); + OutputStream out = new BufferedOutputStream(socket.getOutputStream()); - public void service(Socket socket) throws ServiceException, IOException { - InputStream in = new BufferedInputStream(socket.getInputStream()); - OutputStream out = new BufferedOutputStream(socket.getOutputStream()); + while (!stop.get()) { + try { + i = in.read(); + } catch (SocketException e) { + // Socket closed. + break; + } + KeepAliveStyle style = KeepAliveStyle.values()[i]; - try { - while (true) { - keepAliveTimer.setStatus(new Status(true, in)); - int i = in.read(); - char c = (char) i; - if (i == 30){ - keepAliveTimer.setStatus(new Status(false, null)); - service.service(new Input(in), new Output(out)); - out.flush(); - } else { - keepAliveTimer.setStatus(new Status(false, null)); - break; + try { + usage.lock(); + + switch(style){ + case PING_PING: { + in.read(); + } + break; + case PING_PONG: { + out.write(style.ordinal()); + out.flush(); + } + } + + service.service(new Input(in), new Output(out)); + out.flush(); + } finally { + this.lastRequest = System.currentTimeMillis(); + usage.unlock(); + } } + } catch (ArrayIndexOutOfBoundsException e){ + throw new IOException("Unexpected byte " + i); + } catch (InterruptedIOException e) { + Thread.interrupted(); + } finally { + keepAliveTimer.removeSession(this); } - } catch (InterruptedIOException e) { - Thread.interrupted(); - } catch (IOException e) { - } finally{ - keepAliveTimer.setStatus(new Status(false, null)); -// System.out.println("close socket"); - socket.close(); } } + + public void service(Socket socket) throws ServiceException, IOException { + Session session = new Session(socket); + session.service(socket); + } + public void service(InputStream in, OutputStream out) throws ServiceException, IOException { } @@ -202,11 +245,16 @@ } public void start() throws ServiceException { - service.start(); + stop.set(false); + +// service.start(); } + public void stop() throws ServiceException { - service.stop(); + stop.set(true); + keepAliveTimer.closeSessions(); +// service.stop(); } public void init(Properties props) throws Exception {
Added: openejb/trunk/openejb3/server/openejb-ejbd/src/test/java/org/apache/openejb/server/ejbd/FailoverTest.java URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-ejbd/src/test/java/org/apache/openejb/server/ejbd/FailoverTest.java?rev=693321&view=auto ============================================================================== --- openejb/trunk/openejb3/server/openejb-ejbd/src/test/java/org/apache/openejb/server/ejbd/FailoverTest.java (added) +++ openejb/trunk/openejb3/server/openejb-ejbd/src/test/java/org/apache/openejb/server/ejbd/FailoverTest.java Mon Sep 8 17:09:25 2008 @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.openejb.server.ejbd; + +import junit.framework.TestCase; +import org.apache.openejb.OpenEJB; +import org.apache.openejb.OpenEJBException; +import org.apache.openejb.assembler.classic.Assembler; +import org.apache.openejb.config.ConfigurationFactory; +import org.apache.openejb.core.ServerFederation; +import org.apache.openejb.jee.EjbJar; +import org.apache.openejb.jee.StatelessBean; +import org.apache.openejb.loader.SystemInstance; +import org.apache.openejb.server.DiscoveryAgent; +import org.apache.openejb.server.DiscoveryListener; +import org.apache.openejb.server.ServerService; +import org.apache.openejb.server.ServerServiceFilter; +import org.apache.openejb.server.ServiceDaemon; +import org.apache.openejb.server.ServiceException; + +import javax.ejb.Remote; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * @version $Rev$ $Date$ + */ +public class FailoverTest extends TestCase { + + private static DiscoveryAgent agent = new TestAgent(); + + public void test() throws Exception { + + Properties initProps = new Properties(); + initProps.setProperty("openejb.deployments.classpath.include", ""); + initProps.setProperty("openejb.deployments.classpath.filter.descriptors", "true"); + OpenEJB.init(initProps, new ServerFederation()); + + SystemInstance.get().setComponent(DiscoveryAgent.class, agent); + + ServerService red = server(Host.RED); + ServerService blue = server(Host.BLUE); + ServerService green = server(Host.GREEN); + + red.start(); + blue.start(); + green.start(); + + TargetRemote target = getBean(red); + + assertEquals(Host.RED, target.getHost()); + + red.stop(); + + assertEquals(Host.BLUE, target.getHost()); + + blue.stop(); + + assertEquals(Host.GREEN, target.getHost()); + } + + private TargetRemote getBean(ServerService server) throws NamingException, IOException, OpenEJBException { + int port = server.getPort(); + + Assembler assembler = SystemInstance.get().getComponent(Assembler.class); + ConfigurationFactory config = new ConfigurationFactory(); + + EjbJar ejbJar = new EjbJar(); + ejbJar.addEnterpriseBean(new StatelessBean(Target.class)); + assembler.createApplication(config.configureApplication(ejbJar)); + + // good creds + Properties props = new Properties(); + props.put("java.naming.factory.initial", "org.apache.openejb.client.RemoteInitialContextFactory"); + props.put("java.naming.provider.url", "ejbd://localhost:" + port + "/RED"); + System.setProperty("openejb.client.keepalive", "ping_pong"); + Context context = new InitialContext(props); + TargetRemote target = (TargetRemote) context.lookup("TargetRemote"); + return target; + } + + private ServerService server(Host host) throws Exception { + ServerService server = new EjbServer(); + + server = new HostFilter(server, host); + + server = new ServiceDaemon(server, 0, "localhost"); + + server = new AgentFilter(server, agent, host); + + server.init(new Properties()); + + return server; + } + + + // Simple single-threaded version, way easier on testing + public static class TestAgent implements DiscoveryAgent { + + private final List<DiscoveryListener> listeners = new ArrayList<DiscoveryListener>(); + + public void registerService(URI serviceUri) throws IOException { + for (DiscoveryListener listener : listeners) { + listener.serviceAdded(serviceUri); + } + } + + public void reportFailed(URI serviceUri) throws IOException { + } + + public void setDiscoveryListener(DiscoveryListener listener) { + listeners.add(listener); + } + + public void unregisterService(URI serviceUri) throws IOException { + for (DiscoveryListener listener : listeners) { + listener.serviceRemoved(serviceUri); + } + } + + } + + public static enum Host { + RED, BLUE, GREEN; + } + + public static ThreadLocal<Host> host = new ThreadLocal<Host>(); + + + public static class AgentFilter extends ServerServiceFilter { + private final Host host; + private final DiscoveryAgent agent; + private URI uri; + + public AgentFilter(ServerService service, DiscoveryAgent agent, Host host) { + super(service); + this.agent = agent; + this.host = host; + } + + public void start() throws ServiceException { + super.start(); + try { + uri = new URI("ejb:ejbd://localhost:" + getPort() + "/" + host); + agent.registerService(uri); + } catch (Exception e) { + throw new ServiceException(e); + } + } + + public void stop() throws ServiceException { + super.stop(); + try { + agent.unregisterService(uri); + } catch (Exception e) { + throw new ServiceException(e); + } + } + } + + public static class HostFilter extends ServerServiceFilter { + private final Host me; + + public HostFilter(ServerService service, Host me) { + super(service); + this.me = me; + } + + public void service(InputStream in, OutputStream out) throws ServiceException, IOException { + try { + host.set(me); + super.service(in, out); + } finally { + host.remove(); + } + } + + public void service(Socket socket) throws ServiceException, IOException { + try { + host.set(me); + super.service(socket); + } finally { + host.remove(); + } + } + } + + public static class Target implements TargetRemote { + public Host getHost() { + return host.get(); + } + } + + @Remote + public static interface TargetRemote { + Host getHost(); + } +} Copied: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryAgent.java (from r691472, openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryAgent.java) URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryAgent.java?p2=openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryAgent.java&p1=openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryAgent.java&r1=691472&r2=693321&rev=693321&view=diff ============================================================================== --- openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryAgent.java (original) +++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryAgent.java Mon Sep 8 17:09:25 2008 @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.openejb.server.discovery; +package org.apache.openejb.server; import java.net.URI; import java.io.IOException; Copied: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryListener.java (from r691472, openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryListener.java) URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryListener.java?p2=openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryListener.java&p1=openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryListener.java&r1=691472&r2=693321&rev=693321&view=diff ============================================================================== --- openejb/trunk/openejb3/server/openejb-discovery/src/main/java/org/apache/openejb/server/discovery/DiscoveryListener.java (original) +++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryListener.java Mon Sep 8 17:09:25 2008 @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.openejb.server.discovery; +package org.apache.openejb.server; import java.net.URI; @@ -23,7 +23,7 @@ /** * @version $Rev$ $Date$ */ -public interface DiscoveryListener { +public interface DiscoveryListener { public void serviceAdded(URI service); public void serviceRemoved(URI service); Added: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java?rev=693321&view=auto ============================================================================== --- openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java (added) +++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java Mon Sep 8 17:09:25 2008 @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.openejb.server; + +import org.apache.openejb.loader.SystemInstance; + +import java.net.URI; +import java.util.List; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.HashSet; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ConcurrentHashMap; +import java.io.IOException; + +/** + * @version $Rev$ $Date$ + */ +public class DiscoveryRegistry implements DiscoveryListener, DiscoveryAgent { + + private final DiscoveryAgent agent; + private final List<DiscoveryListener> listeners = new ArrayList<DiscoveryListener>(); + private final Map<String, URI> services = new ConcurrentHashMap<String, URI>(); + + private final Executor executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { + public Thread newThread(Runnable runable) { + Thread t = new Thread(runable, DiscoveryRegistry.class.getSimpleName()); + t.setDaemon(true); + return t; + } + }); + + public DiscoveryRegistry(DiscoveryAgent agent) { + this.agent = agent; + agent.setDiscoveryListener(this); + SystemInstance.get().setComponent(DiscoveryRegistry.class, this); + SystemInstance.get().setComponent(DiscoveryAgent.class, this); + } + + public Set<URI> getServices() { + return new HashSet<URI>(services.values()); + } + + public void registerService(URI serviceUri) throws IOException { + agent.registerService(serviceUri); + } + + public void reportFailed(URI serviceUri) throws IOException { + agent.reportFailed(serviceUri); + } + + public void unregisterService(URI serviceUri) throws IOException { + agent.unregisterService(serviceUri); + } + + public void setDiscoveryListener(DiscoveryListener listener) { + addDiscoveryListener(listener); + } + + public void addDiscoveryListener(DiscoveryListener listener){ + // get the listener caught up + for (URI service : services.values()) { + executor.execute(new ServiceAddedTask(listener, service)); + } + + listeners.add(listener); + } + + public void removeDiscoveryListener(DiscoveryListener listener){ + listeners.remove(listener); + } + + + public void serviceAdded(URI service) { + services.put(service.toString(), service); + for (final DiscoveryListener discoveryListener : getListeners()) { + executor.execute(new ServiceAddedTask(discoveryListener, service)); + } + } + + public void serviceRemoved(URI service) { + + for (final DiscoveryListener discoveryListener : getListeners()) { + executor.execute(new ServiceRemovedTask(discoveryListener, service)); + } + } + + List<DiscoveryListener> getListeners(){ + return Collections.unmodifiableList(listeners); + } + + private abstract static class Task implements Runnable { + protected final DiscoveryListener discoveryListener; + protected final URI service; + + protected Task(DiscoveryListener discoveryListener, URI service) { + this.discoveryListener = discoveryListener; + this.service = service; + } + } + + private static class ServiceRemovedTask extends Task { + public ServiceRemovedTask(DiscoveryListener discoveryListener, URI service) { + super(discoveryListener, service); + } + + public void run() { + if (discoveryListener != null) { + discoveryListener.serviceRemoved(service); + } + } + } + + private static class ServiceAddedTask extends Task { + public ServiceAddedTask(DiscoveryListener discoveryListener, URI service) { + super(discoveryListener, service); + } + + public void run() { + if (discoveryListener != null) { + discoveryListener.serviceAdded(service); + } + } + } +} Added: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServerServiceFilter.java URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServerServiceFilter.java?rev=693321&view=auto ============================================================================== --- openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServerServiceFilter.java (added) +++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServerServiceFilter.java Mon Sep 8 17:09:25 2008 @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.openejb.server; + +import org.apache.openejb.server.ServerService; +import org.apache.openejb.server.ServiceException; + +import java.io.InputStream; +import java.io.OutputStream; +import java.io.IOException; +import java.net.Socket; +import java.util.Properties; + +/** + * TODO: Make this the superclass of the appropriate ServerService implementations + * @version $Rev$ $Date$ + */ +public class ServerServiceFilter implements ServerService { + private final ServerService service; + + public ServerServiceFilter(ServerService service) { + this.service = service; + } + + public String getIP() { + return service.getIP(); + } + + public String getName() { + return service.getName(); + } + + public int getPort() { + return service.getPort(); + } + + public void service(InputStream in, OutputStream out) throws ServiceException, IOException { + service.service(in, out); + } + + public void service(Socket socket) throws ServiceException, IOException { + service.service(socket); + } + + public void start() throws ServiceException { + service.start(); + } + + public void stop() throws ServiceException { + service.stop(); + } + + public void init(Properties props) throws Exception { + service.init(props); + } +} Modified: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceDaemon.java URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceDaemon.java?rev=693321&r1=693320&r2=693321&view=diff ============================================================================== --- openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceDaemon.java (original) +++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceDaemon.java Mon Sep 8 17:09:25 2008 @@ -32,6 +32,12 @@ import java.net.SocketTimeoutException; import java.net.UnknownHostException; import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.TimeUnit; /** */ @@ -143,7 +149,7 @@ secure = getBoolean(props, "secure", false); timeout = 1000; - + next.init(props); } @@ -185,11 +191,11 @@ public void stop() throws ServiceException { synchronized (this) { + next.stop(); if (socketListener != null) { socketListener.stop(); socketListener = null; } - next.stop(); } } @@ -216,35 +222,49 @@ } private static class SocketListener implements Runnable { - private ServerService serverService; - private ServerSocket serverSocket; - private boolean stopped; + private final ServerService serverService; + private final ServerSocket serverSocket; + private AtomicBoolean stop = new AtomicBoolean(); + private Lock lock = new ReentrantLock(); public SocketListener(ServerService serverService, ServerSocket serverSocket) { this.serverService = serverService; this.serverSocket = serverSocket; - stopped = false; } - public synchronized void stop() { - stopped = true; - } - - private synchronized boolean shouldStop() { - return stopped; + public void stop() { + stop.set(true); + try { + if (lock.tryLock(10, TimeUnit.SECONDS)){ + serverSocket.close(); + } + } catch (InterruptedException e) { + Thread.interrupted(); + } catch (IOException e) { + } } public void run() { - while (!shouldStop()) { + while (!stop.get()) { Socket socket = null; try { socket = serverSocket.accept(); socket.setTcpNoDelay(true); - if (!shouldStop()) { + if (!stop.get()) { // the server service is responsible // for closing the socket. - serverService.service(socket); + try { + lock.lock(); + serverService.service(socket); + } finally { + lock.unlock(); + } } + + // Sockets are consumed in other threads + // and should never be closed here + // It's up to the consumer of the socket + // to close it. } catch (SocketTimeoutException e) { // we don't really care // log.debug("Socket timed-out",e); @@ -253,15 +273,11 @@ } } - if (serverSocket != null) { - try { - serverSocket.close(); - } catch (IOException ioException) { - log.debug("Error cleaning up socked", ioException); - } - serverSocket = null; + try { + serverSocket.close(); + } catch (IOException ioException) { + log.debug("Error cleaning up socked", ioException); } - serverService = null; } public void setSoTimeout(int timeout) throws SocketException { Modified: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceManager.java URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceManager.java?rev=693321&r1=693320&r2=693321&view=diff ============================================================================== --- openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceManager.java (original) +++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceManager.java Mon Sep 8 17:09:25 2008 @@ -44,6 +44,8 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.LinkedHashMap; +import java.util.Collections; /** * @version $Rev$ $Date$ @@ -144,7 +146,8 @@ ServiceFinder serviceFinder = new ServiceFinder("META-INF/"); - Map availableServices = serviceFinder.mapAvailableServices(ServerService.class); + Map<String, Properties> availableServices = serviceFinder.mapAvailableServices(ServerService.class); + List enabledServers = new ArrayList(); OpenEjbConfiguration conf = SystemInstance.get().getComponent(OpenEjbConfiguration.class); @@ -198,6 +201,12 @@ service = (ServerService) recipe.create(serviceClass.getClassLoader()); + if (service instanceof DiscoveryAgent){ + DiscoveryAgent agent = (DiscoveryAgent) service; + DiscoveryRegistry registry = new DiscoveryRegistry(agent); + SystemInstance.get().setComponent(DiscoveryRegistry.class, registry); + } + if (!(service instanceof SelfManaging)) { service = new ServicePool(service, serviceName, serviceProperties); service = new ServiceLogger(service); Modified: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServicePool.java URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServicePool.java?rev=693321&r1=693320&r2=693321&view=diff ============================================================================== --- openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServicePool.java (original) +++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServicePool.java Mon Sep 8 17:09:25 2008 @@ -23,6 +23,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.openejb.util.LogCategory; import org.apache.openejb.util.Logger; @@ -40,6 +41,7 @@ private final ServerService next; private final Executor executor; private final ThreadPoolExecutor threadPool; + private final AtomicBoolean stop = new AtomicBoolean(); public ServicePool(ServerService next, String name, Properties properties) { this(next, name, getInt(properties, "threads", 100)); @@ -86,6 +88,7 @@ final Runnable service = new Runnable() { public void run() { try { + if (stop.get()) return; next.service(socket); } catch (SecurityException e) { log.error("Security error: " + e.getMessage(), e); @@ -93,6 +96,12 @@ log.error("Unexpected error", e); } finally { try { + // Once the thread is done with the socket, clean it up + // The ServiceDaemon does not close the sockets as it is + // single threaded and only accepts sockets and then + // hands them off to be proceeceed. As the thread doing + // that processing it is our job to close the socket + // when we are finished with it. if (socket != null) { socket.close(); }
