hammant 2003/02/04 16:10:27 Modified: altrmi/src/java/org/apache/excalibur/altrmi/client/impl/callback/stream CallbackEnabledClientCustomStreamReadWriter.java altrmi/src/java/org/apache/excalibur/altrmi/server/impl/callback/socket CallbackEnabledSocketCustomStreamServer.java CallbackServerClientReadWriter.java PartialCallbackEnabledSocketCustomStreamServer.java Added: altrmi/src/java/org/apache/excalibur/altrmi/common ReqRepBytes.java altrmi/src/java/org/apache/excalibur/altrmi/server/impl/callback/stream CallbackEnabledCustomStreamReadWriter.java Removed: altrmi/src/java/org/apache/excalibur/altrmi/client/impl/callback/stream ReqRepBytes.java altrmi/src/java/org/apache/excalibur/altrmi/server/impl/callback/socket CallbackEnabledSocketCustomStreamReadWriter.java Log: Updated server side of callbacks to be more robust Revision Changes Path 1.16 +9 -26 jakarta-avalon-excalibur/altrmi/src/java/org/apache/excalibur/altrmi/client/impl/callback/stream/CallbackEnabledClientCustomStreamReadWriter.java Index: CallbackEnabledClientCustomStreamReadWriter.java =================================================================== RCS file: /home/cvs/jakarta-avalon-excalibur/altrmi/src/java/org/apache/excalibur/altrmi/client/impl/callback/stream/CallbackEnabledClientCustomStreamReadWriter.java,v retrieving revision 1.15 retrieving revision 1.16 diff -u -r1.15 -r1.16 --- CallbackEnabledClientCustomStreamReadWriter.java 4 Feb 2003 14:36:29 -0000 1.15 +++ CallbackEnabledClientCustomStreamReadWriter.java 5 Feb 2003 00:10:25 -0000 1.16 @@ -15,6 +15,7 @@ import java.io.OutputStream; import java.io.EOFException; import java.util.HashMap; +import java.net.SocketException; import org.apache.excalibur.altrmi.client.impl.ClientStreamReadWriter; import org.apache.excalibur.altrmi.common.CallbackException; @@ -26,6 +27,7 @@ import org.apache.excalibur.altrmi.common.ThreadContext; import org.apache.excalibur.altrmi.common.ExceptionReply; import org.apache.excalibur.altrmi.common.InvocationException; +import org.apache.excalibur.altrmi.common.ReqRepBytes; import org.apache.excalibur.altrmi.server.Server; import org.apache.excalibur.altrmi.server.PublicationException; import org.apache.excalibur.altrmi.server.ServerException; @@ -99,14 +101,15 @@ { while (!m_isStopped) { - ReqRepBytes reqRepBytes = readByteArrayFromInputStream(); + ReqRepBytes reqRepBytes = ReqRepBytes.getRequestReplyBytesFromDataStream(m_dataInputStream); if (reqRepBytes.getIOException() != null) { IOException ioe = reqRepBytes.getIOException(); - if (ioe instanceof EOFException) + if (ioe != null) { - if (reqRepBytes.ioeDuringReadInt()) + if ((ioe instanceof EOFException && reqRepBytes.ioeDuringReadInt()) + || ioe instanceof SocketException) { m_isStopped = true; // restart ? @@ -117,7 +120,7 @@ } // Is a reply. - if (!(reqRepBytes.isRequest().booleanValue())) + if (!(reqRepBytes.isRequest())) { try { @@ -134,7 +137,7 @@ } } // Is a request. - else if (reqRepBytes.isRequest().booleanValue()) + else if (reqRepBytes.isRequest()) { try { @@ -161,26 +164,6 @@ } } } - } - - private ReqRepBytes readByteArrayFromInputStream() - { - int byteArraySize = 0; - Boolean isRequest = null; - byte[] byteArray = null; - IOException ioe = null; - try - { - byteArraySize = m_dataInputStream.readInt(); - isRequest = m_dataInputStream.readBoolean() ? Boolean.TRUE : Boolean.FALSE; - byteArray = new byte[byteArraySize]; - m_dataInputStream.read(byteArray); - } - catch (IOException e) - { - ioe = e; - } - return new ReqRepBytes(byteArraySize, byteArray, isRequest, ioe); } /** 1.6 +3 -2 jakarta-avalon-excalibur/altrmi/src/java/org/apache/excalibur/altrmi/server/impl/callback/socket/CallbackEnabledSocketCustomStreamServer.java Index: CallbackEnabledSocketCustomStreamServer.java =================================================================== RCS file: /home/cvs/jakarta-avalon-excalibur/altrmi/src/java/org/apache/excalibur/altrmi/server/impl/callback/socket/CallbackEnabledSocketCustomStreamServer.java,v retrieving revision 1.5 retrieving revision 1.6 diff -u -r1.5 -r1.6 --- CallbackEnabledSocketCustomStreamServer.java 29 Jan 2003 08:01:13 -0000 1.5 +++ CallbackEnabledSocketCustomStreamServer.java 5 Feb 2003 00:10:26 -0000 1.6 @@ -9,6 +9,7 @@ import org.apache.excalibur.altrmi.server.ServerException; import org.apache.excalibur.altrmi.server.impl.ServerStreamReadWriter; +import org.apache.excalibur.altrmi.server.impl.callback.stream.CallbackEnabledCustomStreamReadWriter; import org.apache.excalibur.altrmi.server.impl.adapters.InvocationHandlerAdapter; import org.apache.excalibur.altrmi.server.impl.socket.AbstractCompleteSocketStreamServer; @@ -65,7 +66,7 @@ */ protected ServerStreamReadWriter createServerStreamReadWriter() { - CallbackEnabledSocketCustomStreamReadWriter callbackEnabledSocketCustomStreamReadWriter = new CallbackEnabledSocketCustomStreamReadWriter(); + CallbackEnabledCustomStreamReadWriter callbackEnabledSocketCustomStreamReadWriter = new CallbackEnabledCustomStreamReadWriter(); callbackEnabledSocketCustomStreamReadWriter.setThreadContextProvider(getThreadContextProvider()); return callbackEnabledSocketCustomStreamReadWriter; } 1.7 +5 -4 jakarta-avalon-excalibur/altrmi/src/java/org/apache/excalibur/altrmi/server/impl/callback/socket/CallbackServerClientReadWriter.java Index: CallbackServerClientReadWriter.java =================================================================== RCS file: /home/cvs/jakarta-avalon-excalibur/altrmi/src/java/org/apache/excalibur/altrmi/server/impl/callback/socket/CallbackServerClientReadWriter.java,v retrieving revision 1.6 retrieving revision 1.7 diff -u -r1.6 -r1.7 --- CallbackServerClientReadWriter.java 5 Jan 2003 23:24:45 -0000 1.6 +++ CallbackServerClientReadWriter.java 5 Feb 2003 00:10:26 -0000 1.7 @@ -11,6 +11,7 @@ import org.apache.excalibur.altrmi.client.impl.ClientStreamReadWriter; import org.apache.excalibur.altrmi.common.Reply; import org.apache.excalibur.altrmi.common.Request; +import org.apache.excalibur.altrmi.server.impl.callback.stream.CallbackEnabledCustomStreamReadWriter; /** * Class CallbackServerClientReadWriter @@ -23,7 +24,7 @@ public class CallbackServerClientReadWriter implements ClientStreamReadWriter { - private CallbackEnabledSocketCustomStreamReadWriter m_callbackEnabledCustomSocketStreamReadWriter; + private CallbackEnabledCustomStreamReadWriter m_callbackEnabledCustomSocketStreamReadWriter; /** * Constructor CallbackServerClientReadWriter @@ -32,8 +33,8 @@ * @param callbackEnabledCustomSocketStreamReadWriter * */ - CallbackServerClientReadWriter( - CallbackEnabledSocketCustomStreamReadWriter callbackEnabledCustomSocketStreamReadWriter ) + public CallbackServerClientReadWriter( + CallbackEnabledCustomStreamReadWriter callbackEnabledCustomSocketStreamReadWriter ) { this.m_callbackEnabledCustomSocketStreamReadWriter = callbackEnabledCustomSocketStreamReadWriter; 1.5 +3 -2 jakarta-avalon-excalibur/altrmi/src/java/org/apache/excalibur/altrmi/server/impl/callback/socket/PartialCallbackEnabledSocketCustomStreamServer.java Index: PartialCallbackEnabledSocketCustomStreamServer.java =================================================================== RCS file: /home/cvs/jakarta-avalon-excalibur/altrmi/src/java/org/apache/excalibur/altrmi/server/impl/callback/socket/PartialCallbackEnabledSocketCustomStreamServer.java,v retrieving revision 1.4 retrieving revision 1.5 diff -u -r1.4 -r1.5 --- PartialCallbackEnabledSocketCustomStreamServer.java 11 Jan 2003 16:10:08 -0000 1.4 +++ PartialCallbackEnabledSocketCustomStreamServer.java 5 Feb 2003 00:10:26 -0000 1.5 @@ -8,6 +8,7 @@ package org.apache.excalibur.altrmi.server.impl.callback.socket; import org.apache.excalibur.altrmi.server.impl.ServerStreamReadWriter; +import org.apache.excalibur.altrmi.server.impl.callback.stream.CallbackEnabledCustomStreamReadWriter; import org.apache.excalibur.altrmi.server.impl.adapters.InvocationHandlerAdapter; import org.apache.excalibur.altrmi.server.impl.socket.AbstractPartialSocketStreamServer; @@ -49,6 +50,6 @@ */ protected ServerStreamReadWriter createServerStreamReadWriter() { - return new CallbackEnabledSocketCustomStreamReadWriter(); + return new CallbackEnabledCustomStreamReadWriter(); } } 1.1 jakarta-avalon-excalibur/altrmi/src/java/org/apache/excalibur/altrmi/common/ReqRepBytes.java Index: ReqRepBytes.java =================================================================== /* * Copyright (C) The Apache Software Foundation. All rights reserved. * * This software is published under the terms of the Apache Software License * version 1.1, a copy of which has been included with this distribution in * the LICENSE.txt file. */ package org.apache.excalibur.altrmi.common; import java.io.IOException; import java.io.DataInputStream; /** * ReqRepBytes * * @author Paul Hammant * @version $Revision: 1.1 $ */ public class ReqRepBytes { int m_byteSize; byte[] m_bytes; Boolean m_isRequest; IOException m_ioe; public ReqRepBytes(int byteSize, byte[] bytes, Boolean isRequest, IOException ioe) { m_byteSize = byteSize; m_bytes = bytes; m_isRequest = isRequest; m_ioe = ioe; } public boolean ioeDuringReadInt() { return (m_ioe != null & m_byteSize == 0); } public int getByteSize() { return m_byteSize; } public byte[] getBytes() { return m_bytes; } // request or reply public boolean isRequest() { return m_isRequest.booleanValue(); } public IOException getIOException() { return m_ioe; } public static ReqRepBytes getRequestReplyBytesFromDataStream(DataInputStream dis) { int byteArraySize = 0; Boolean isRequest = null; byte[] byteArray = null; IOException ioe = null; try { byteArraySize = dis.readInt(); isRequest = dis.readBoolean() ? Boolean.TRUE : Boolean.FALSE; byteArray = new byte[ byteArraySize ]; dis.read( byteArray ); } catch (IOException e) { ioe = e; } return new ReqRepBytes(byteArraySize, byteArray, isRequest, ioe); } } 1.1 jakarta-avalon-excalibur/altrmi/src/java/org/apache/excalibur/altrmi/server/impl/callback/stream/CallbackEnabledCustomStreamReadWriter.java Index: CallbackEnabledCustomStreamReadWriter.java =================================================================== /* * Copyright (C) The Apache Software Foundation. All rights reserved. * * This software is published under the terms of the Apache Software License * version 1.1, a copy of which has been included with this distribution in * the LICENSE.txt file. */ package org.apache.excalibur.altrmi.server.impl.callback.stream; import java.io.BufferedOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.EOFException; import java.net.SocketException; import org.apache.excalibur.altrmi.client.ConnectionClosedException; import org.apache.excalibur.altrmi.common.Reply; import org.apache.excalibur.altrmi.common.Request; import org.apache.excalibur.altrmi.common.ExposedObjectProxy; import org.apache.excalibur.altrmi.common.MethodRequest; import org.apache.excalibur.altrmi.common.SerializationHelper; import org.apache.excalibur.altrmi.common.ThreadContext; import org.apache.excalibur.altrmi.common.ReqRepBytes; import org.apache.excalibur.altrmi.common.ExceptionReply; import org.apache.excalibur.altrmi.common.InvocationException; import org.apache.excalibur.altrmi.server.impl.ServerStreamReadWriter; import org.apache.excalibur.altrmi.server.impl.callback.CallbackHostContext; import org.apache.excalibur.altrmi.server.impl.callback.CallbackServerClassFactory; import org.apache.excalibur.altrmi.server.impl.callback.socket.CallbackServerClientReadWriter; import org.apache.excalibur.altrmi.server.impl.callback.stream.CallbackStreamInvocationHandler; /** * CallbackEnabledCustomStreamReadWriter * This StreamReadWriter is responsible for unmarshalling * exposedClient objects too and devliering it to the Remote Object. * The calls made for the exposed Client Object is also marshalled * out here with the aid of the internal InvocationHandler. * The callbacks are multiplexed over the same connection and thus * there are no extra network connections needed to enable callbacks. * * @author <a href="mailto:[EMAIL PROTECTED]">Vinay Chandran</a> * @author Paul Hammant * @version $Revision: 1.1 $ */ public class CallbackEnabledCustomStreamReadWriter extends ServerStreamReadWriter implements Runnable { private DataInputStream m_dataInputStream; private DataOutputStream m_dataOutputStream; private ThreadContext m_messageLoopThread = null; private boolean m_isStopped = false; private Object m_requestLock = new Object(); private Object m_replyLock = new Object(); private Request m_request = null; private Reply m_reply = null; private CallbackServerClientReadWriter m_callbackServerClientReadWriter; private CallbackStreamInvocationHandler m_callbackStreamInvocationHandler; private CallbackHostContext m_callbackHostContext; private CallbackServerClassFactory m_altrmiFactory; /* * @see ServerStreamReadWriter#initialize() */ protected void initialize() throws IOException { m_dataInputStream = new DataInputStream(getInputStream()); m_dataOutputStream = new DataOutputStream(new BufferedOutputStream(getOutputStream())); //start the receiving message loop m_messageLoopThread = getThreadContextProvider().getThreadContext(this); m_messageLoopThread.start(); } /** * Message Loop . * Notifies the Reply or Request monitor depending upon the type of message. * @see java.lang.Runnable#run() */ public void run() { m_callbackServerClientReadWriter = new CallbackServerClientReadWriter(this); m_callbackStreamInvocationHandler = new CallbackStreamInvocationHandler(this.getClass().getClassLoader()); m_callbackStreamInvocationHandler.setObjectReadWriter(m_callbackServerClientReadWriter); m_callbackHostContext = new CallbackHostContext(m_callbackStreamInvocationHandler); m_altrmiFactory = new CallbackServerClassFactory(); m_altrmiFactory.setInitializedHostContext(m_callbackHostContext); while (!m_isStopped) { ReqRepBytes reqRepBytes = ReqRepBytes.getRequestReplyBytesFromDataStream(m_dataInputStream); if (reqRepBytes.getIOException() != null) { IOException ioe = reqRepBytes.getIOException(); if (ioe != null) { if ((ioe instanceof EOFException && reqRepBytes.ioeDuringReadInt()) || ioe instanceof SocketException) { m_isStopped = true; notifyBoth(); // restart ? return; } } System.out.println("--> ##1 " + ioe.getClass().getName()); } //interpret it as a request or reply & notify the corresponding listeners // SHLD we need a queue here to hold the arriving packets // TODO:WORKAROUND: Dont receive until the data is handled some way if (reqRepBytes.isRequest()) { try { Object obj = SerializationHelper.getInstanceFromBytes(reqRepBytes.getBytes()); m_request = (Request) obj; } catch (ClassNotFoundException e) { System.out.println("--> TODO!!" + e.getMessage()); m_request = null; m_isStopped = true; return; } synchronized (m_requestLock) { m_requestLock.notify(); } } else // Reply. { try { Object obj = SerializationHelper.getInstanceFromBytes(reqRepBytes.getBytes()); m_reply = (Reply) obj; } catch (ClassNotFoundException e) { m_reply = new ExceptionReply(new InvocationException("ClassNotFoundException", e)); } synchronized (m_replyLock) { m_replyLock.notify(); } } } } private void notifyBoth() { m_isStopped = true; synchronized (m_requestLock) { m_requestLock.notify(); } synchronized (m_replyLock) { m_replyLock.notify(); } } /** * Method getRequestFromMessageLoop. * @return Request */ private Request getRequestFromMessageLoop() { //if(_replyQueue.size()==0) if (m_request == null) { synchronized (m_requestLock) { try { m_requestLock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } return m_request; } /** * Method getReplyFromMessageLoop. * @return Reply */ private Reply getReplyFromMessageLoop() { if (m_reply == null) { synchronized (m_replyLock) { try { m_replyLock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } return m_reply; } /* * @see ServerStreamReadWriter#writeReplyAndGetRequest(Reply) */ protected Request writeReplyAndGetRequest(Reply altrmiReply) throws IOException, ClassNotFoundException { if (altrmiReply != null) { writeReply(altrmiReply); } Request req = readRequest(); m_request = null; return req; } /** * Method writeReply. * @param altrmiReply * @throws java.io.IOException */ private void writeReply(Reply altrmiReply) throws IOException { byte[] aBytes = SerializationHelper.getBytesFromInstance(altrmiReply); m_dataOutputStream.writeInt(aBytes.length); m_dataOutputStream.writeBoolean(false); m_dataOutputStream.write(aBytes); m_dataOutputStream.flush(); } /** * Method readRequest. * @return Request */ private Request readRequest() { Request altrmiRequest = getRequestFromMessageLoop(); if (altrmiRequest instanceof MethodRequest) { correctArgs(((MethodRequest) altrmiRequest).getArgs()); } return altrmiRequest; } //client side /** * Method postRequest. * @param altrmiRequest * @return Reply * @throws java.io.IOException * @throws java.lang.ClassNotFoundException */ public Reply postRequest(Request altrmiRequest) throws IOException, ClassNotFoundException { if (m_isStopped) { throw new ConnectionClosedException("Client Closed Connection"); } writeRequest(altrmiRequest); Reply r = getReplyFromMessageLoop(); if (r == null) { throw new java.io.InterruptedIOException("Client Connection Closed"); } m_reply = null; return r; } /** * Method writeRequest. * @param altrmiRequest * @throws java.io.IOException */ private void writeRequest(Request altrmiRequest) throws IOException { byte[] aBytes = SerializationHelper.getBytesFromInstance(altrmiRequest); m_dataOutputStream.writeInt(aBytes.length); m_dataOutputStream.writeBoolean(true); m_dataOutputStream.write(aBytes); m_dataOutputStream.flush(); } /** * Method correctArgs. * UnMarshall the arguments . * @param args */ public void correctArgs(Object[] args) { for (int i = 0; i < args.length; i++) { if (args[i] instanceof ExposedObjectProxy) { ExposedObjectProxy exposedObjectProxy = (ExposedObjectProxy) args[i]; try { //lookup the client-side exported object. Object obj = m_altrmiFactory.lookup(exposedObjectProxy.getPublishedName()); args[i] = obj; } catch (Exception altrmiConnectionException) { altrmiConnectionException.printStackTrace(); } } } } }
--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]