hammant     2003/02/04 16:10:27

  Modified:    altrmi/src/java/org/apache/excalibur/altrmi/client/impl/callback/stream
                        CallbackEnabledClientCustomStreamReadWriter.java
               altrmi/src/java/org/apache/excalibur/altrmi/server/impl/callback/socket
                        CallbackEnabledSocketCustomStreamServer.java
                        CallbackServerClientReadWriter.java
                        PartialCallbackEnabledSocketCustomStreamServer.java
  Added:       altrmi/src/java/org/apache/excalibur/altrmi/common
                        ReqRepBytes.java
               altrmi/src/java/org/apache/excalibur/altrmi/server/impl/callback/stream
                        CallbackEnabledCustomStreamReadWriter.java
  Removed:     altrmi/src/java/org/apache/excalibur/altrmi/client/impl/callback/stream
                        ReqRepBytes.java
               altrmi/src/java/org/apache/excalibur/altrmi/server/impl/callback/socket
                        CallbackEnabledSocketCustomStreamReadWriter.java
  Log:
  Updated server side of callbacks to be more robust
  
  Revision  Changes    Path
  1.16      +9 -26     
jakarta-avalon-excalibur/altrmi/src/java/org/apache/excalibur/altrmi/client/impl/callback/stream/CallbackEnabledClientCustomStreamReadWriter.java
  
  Index: CallbackEnabledClientCustomStreamReadWriter.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-avalon-excalibur/altrmi/src/java/org/apache/excalibur/altrmi/client/impl/callback/stream/CallbackEnabledClientCustomStreamReadWriter.java,v
  retrieving revision 1.15
  retrieving revision 1.16
  diff -u -r1.15 -r1.16
  --- CallbackEnabledClientCustomStreamReadWriter.java  4 Feb 2003 14:36:29 -0000      
 1.15
  +++ CallbackEnabledClientCustomStreamReadWriter.java  5 Feb 2003 00:10:25 -0000      
 1.16
  @@ -15,6 +15,7 @@
   import java.io.OutputStream;
   import java.io.EOFException;
   import java.util.HashMap;
  +import java.net.SocketException;
   
   import org.apache.excalibur.altrmi.client.impl.ClientStreamReadWriter;
   import org.apache.excalibur.altrmi.common.CallbackException;
  @@ -26,6 +27,7 @@
   import org.apache.excalibur.altrmi.common.ThreadContext;
   import org.apache.excalibur.altrmi.common.ExceptionReply;
   import org.apache.excalibur.altrmi.common.InvocationException;
  +import org.apache.excalibur.altrmi.common.ReqRepBytes;
   import org.apache.excalibur.altrmi.server.Server;
   import org.apache.excalibur.altrmi.server.PublicationException;
   import org.apache.excalibur.altrmi.server.ServerException;
  @@ -99,14 +101,15 @@
       {
           while (!m_isStopped)
           {
  -            ReqRepBytes reqRepBytes = readByteArrayFromInputStream();
  +            ReqRepBytes reqRepBytes = 
ReqRepBytes.getRequestReplyBytesFromDataStream(m_dataInputStream);
   
               if (reqRepBytes.getIOException() != null)
               {
                   IOException ioe = reqRepBytes.getIOException();
  -                if (ioe instanceof EOFException)
  +                if (ioe != null)
                   {
  -                    if (reqRepBytes.ioeDuringReadInt())
  +                    if ((ioe instanceof EOFException && 
reqRepBytes.ioeDuringReadInt())
  +                            || ioe instanceof SocketException)
                       {
                           m_isStopped = true;
                           // restart ?
  @@ -117,7 +120,7 @@
               }
   
               // Is a reply.
  -            if (!(reqRepBytes.isRequest().booleanValue()))
  +            if (!(reqRepBytes.isRequest()))
               {
                   try
                   {
  @@ -134,7 +137,7 @@
                   }
               }
               // Is a request.
  -            else if (reqRepBytes.isRequest().booleanValue())
  +            else if (reqRepBytes.isRequest())
               {
                   try
                   {
  @@ -161,26 +164,6 @@
                   }
               }
           }
  -    }
  -
  -    private ReqRepBytes readByteArrayFromInputStream()
  -    {
  -        int byteArraySize = 0;
  -        Boolean isRequest = null;
  -        byte[] byteArray = null;
  -        IOException ioe = null;
  -        try
  -        {
  -            byteArraySize = m_dataInputStream.readInt();
  -            isRequest = m_dataInputStream.readBoolean() ? Boolean.TRUE : 
Boolean.FALSE;
  -            byteArray = new byte[byteArraySize];
  -            m_dataInputStream.read(byteArray);
  -        }
  -        catch (IOException e)
  -        {
  -            ioe = e;
  -        }
  -        return new ReqRepBytes(byteArraySize, byteArray, isRequest, ioe);
       }
   
       /**
  
  
  
  1.6       +3 -2      
jakarta-avalon-excalibur/altrmi/src/java/org/apache/excalibur/altrmi/server/impl/callback/socket/CallbackEnabledSocketCustomStreamServer.java
  
  Index: CallbackEnabledSocketCustomStreamServer.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-avalon-excalibur/altrmi/src/java/org/apache/excalibur/altrmi/server/impl/callback/socket/CallbackEnabledSocketCustomStreamServer.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- CallbackEnabledSocketCustomStreamServer.java      29 Jan 2003 08:01:13 -0000     
 1.5
  +++ CallbackEnabledSocketCustomStreamServer.java      5 Feb 2003 00:10:26 -0000      
 1.6
  @@ -9,6 +9,7 @@
   
   import org.apache.excalibur.altrmi.server.ServerException;
   import org.apache.excalibur.altrmi.server.impl.ServerStreamReadWriter;
  +import 
org.apache.excalibur.altrmi.server.impl.callback.stream.CallbackEnabledCustomStreamReadWriter;
   import org.apache.excalibur.altrmi.server.impl.adapters.InvocationHandlerAdapter;
   import 
org.apache.excalibur.altrmi.server.impl.socket.AbstractCompleteSocketStreamServer;
   
  @@ -65,7 +66,7 @@
        */
       protected ServerStreamReadWriter createServerStreamReadWriter()
       {
  -        CallbackEnabledSocketCustomStreamReadWriter 
callbackEnabledSocketCustomStreamReadWriter = new 
CallbackEnabledSocketCustomStreamReadWriter();
  +        CallbackEnabledCustomStreamReadWriter 
callbackEnabledSocketCustomStreamReadWriter = new 
CallbackEnabledCustomStreamReadWriter();
           
callbackEnabledSocketCustomStreamReadWriter.setThreadContextProvider(getThreadContextProvider());
           return callbackEnabledSocketCustomStreamReadWriter;
       }
  
  
  
  1.7       +5 -4      
jakarta-avalon-excalibur/altrmi/src/java/org/apache/excalibur/altrmi/server/impl/callback/socket/CallbackServerClientReadWriter.java
  
  Index: CallbackServerClientReadWriter.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-avalon-excalibur/altrmi/src/java/org/apache/excalibur/altrmi/server/impl/callback/socket/CallbackServerClientReadWriter.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- CallbackServerClientReadWriter.java       5 Jan 2003 23:24:45 -0000       1.6
  +++ CallbackServerClientReadWriter.java       5 Feb 2003 00:10:26 -0000       1.7
  @@ -11,6 +11,7 @@
   import org.apache.excalibur.altrmi.client.impl.ClientStreamReadWriter;
   import org.apache.excalibur.altrmi.common.Reply;
   import org.apache.excalibur.altrmi.common.Request;
  +import 
org.apache.excalibur.altrmi.server.impl.callback.stream.CallbackEnabledCustomStreamReadWriter;
   
   /**
    * Class CallbackServerClientReadWriter
  @@ -23,7 +24,7 @@
   public class CallbackServerClientReadWriter implements ClientStreamReadWriter
   {
   
  -    private CallbackEnabledSocketCustomStreamReadWriter 
m_callbackEnabledCustomSocketStreamReadWriter;
  +    private CallbackEnabledCustomStreamReadWriter 
m_callbackEnabledCustomSocketStreamReadWriter;
   
       /**
        * Constructor CallbackServerClientReadWriter
  @@ -32,8 +33,8 @@
        * @param callbackEnabledCustomSocketStreamReadWriter
        *
        */
  -    CallbackServerClientReadWriter(
  -        CallbackEnabledSocketCustomStreamReadWriter 
callbackEnabledCustomSocketStreamReadWriter )
  +    public CallbackServerClientReadWriter(
  +        CallbackEnabledCustomStreamReadWriter 
callbackEnabledCustomSocketStreamReadWriter )
       {
           this.m_callbackEnabledCustomSocketStreamReadWriter =
               callbackEnabledCustomSocketStreamReadWriter;
  
  
  
  1.5       +3 -2      
jakarta-avalon-excalibur/altrmi/src/java/org/apache/excalibur/altrmi/server/impl/callback/socket/PartialCallbackEnabledSocketCustomStreamServer.java
  
  Index: PartialCallbackEnabledSocketCustomStreamServer.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-avalon-excalibur/altrmi/src/java/org/apache/excalibur/altrmi/server/impl/callback/socket/PartialCallbackEnabledSocketCustomStreamServer.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- PartialCallbackEnabledSocketCustomStreamServer.java       11 Jan 2003 16:10:08 
-0000      1.4
  +++ PartialCallbackEnabledSocketCustomStreamServer.java       5 Feb 2003 00:10:26 
-0000       1.5
  @@ -8,6 +8,7 @@
   package org.apache.excalibur.altrmi.server.impl.callback.socket;
   
   import org.apache.excalibur.altrmi.server.impl.ServerStreamReadWriter;
  +import 
org.apache.excalibur.altrmi.server.impl.callback.stream.CallbackEnabledCustomStreamReadWriter;
   import org.apache.excalibur.altrmi.server.impl.adapters.InvocationHandlerAdapter;
   import 
org.apache.excalibur.altrmi.server.impl.socket.AbstractPartialSocketStreamServer;
   
  @@ -49,6 +50,6 @@
            */
       protected ServerStreamReadWriter createServerStreamReadWriter()
       {
  -        return new CallbackEnabledSocketCustomStreamReadWriter();
  +        return new CallbackEnabledCustomStreamReadWriter();
       }
   }
  
  
  
  1.1                  
jakarta-avalon-excalibur/altrmi/src/java/org/apache/excalibur/altrmi/common/ReqRepBytes.java
  
  Index: ReqRepBytes.java
  ===================================================================
  /*
   * Copyright (C) The Apache Software Foundation. All rights reserved.
   *
   * This software is published under the terms of the Apache Software License
   * version 1.1, a copy of which has been included with this distribution in
   * the LICENSE.txt file.
   */
  package org.apache.excalibur.altrmi.common;
  
  import java.io.IOException;
  import java.io.DataInputStream;
  
  /**
   * ReqRepBytes
   *
   * @author Paul Hammant
   * @version $Revision: 1.1 $
   */
  public class ReqRepBytes {
      int m_byteSize;
      byte[] m_bytes;
      Boolean m_isRequest;
      IOException m_ioe;
  
      public ReqRepBytes(int byteSize, byte[] bytes, Boolean isRequest, IOException 
ioe)
      {
          m_byteSize = byteSize;
          m_bytes = bytes;
          m_isRequest = isRequest;
          m_ioe = ioe;
      }
  
      public boolean ioeDuringReadInt()
      {
          return (m_ioe != null & m_byteSize == 0);
      }
  
      public int getByteSize()
      {
          return m_byteSize;
      }
  
      public byte[] getBytes()
      {
          return m_bytes;
      }
  
      // request or reply
      public boolean isRequest()
      {
          return m_isRequest.booleanValue();
      }
  
      public IOException getIOException()
      {
          return m_ioe;
      }
  
      public static ReqRepBytes getRequestReplyBytesFromDataStream(DataInputStream dis)
      {
          int byteArraySize = 0;
          Boolean isRequest = null;
          byte[] byteArray = null;
          IOException ioe = null;
          try
          {
              byteArraySize = dis.readInt();
              isRequest = dis.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
              byteArray = new byte[ byteArraySize ];
              dis.read( byteArray );
          }
          catch (IOException e)
          {
              ioe = e;
          }
          return new ReqRepBytes(byteArraySize, byteArray, isRequest, ioe);
      }
  
  
  
  }
  
  
  
  1.1                  
jakarta-avalon-excalibur/altrmi/src/java/org/apache/excalibur/altrmi/server/impl/callback/stream/CallbackEnabledCustomStreamReadWriter.java
  
  Index: CallbackEnabledCustomStreamReadWriter.java
  ===================================================================
  /*
   * Copyright (C) The Apache Software Foundation. All rights reserved.
   *
   * This software is published under the terms of the Apache Software License
   * version 1.1, a copy of which has been included with this distribution in
   * the LICENSE.txt file.
   */
  package org.apache.excalibur.altrmi.server.impl.callback.stream;
  
  import java.io.BufferedOutputStream;
  import java.io.DataInputStream;
  import java.io.DataOutputStream;
  import java.io.IOException;
  import java.io.EOFException;
  import java.net.SocketException;
  
  import org.apache.excalibur.altrmi.client.ConnectionClosedException;
  import org.apache.excalibur.altrmi.common.Reply;
  import org.apache.excalibur.altrmi.common.Request;
  import org.apache.excalibur.altrmi.common.ExposedObjectProxy;
  import org.apache.excalibur.altrmi.common.MethodRequest;
  import org.apache.excalibur.altrmi.common.SerializationHelper;
  import org.apache.excalibur.altrmi.common.ThreadContext;
  import org.apache.excalibur.altrmi.common.ReqRepBytes;
  import org.apache.excalibur.altrmi.common.ExceptionReply;
  import org.apache.excalibur.altrmi.common.InvocationException;
  import org.apache.excalibur.altrmi.server.impl.ServerStreamReadWriter;
  import org.apache.excalibur.altrmi.server.impl.callback.CallbackHostContext;
  import org.apache.excalibur.altrmi.server.impl.callback.CallbackServerClassFactory;
  import 
org.apache.excalibur.altrmi.server.impl.callback.socket.CallbackServerClientReadWriter;
  import 
org.apache.excalibur.altrmi.server.impl.callback.stream.CallbackStreamInvocationHandler;
  
  /**
   * CallbackEnabledCustomStreamReadWriter
   *  This StreamReadWriter is responsible for unmarshalling
   *      exposedClient objects too and devliering it to the Remote Object.
   *  The calls made for the exposed Client Object is also marshalled
   *      out here with the aid of the internal InvocationHandler.
   *  The callbacks are multiplexed over the same connection and thus
   *      there are no extra network connections needed to enable callbacks.
   *
   * @author <a href="mailto:[EMAIL PROTECTED]";>Vinay Chandran</a>
   * @author Paul Hammant
   * @version $Revision: 1.1 $
   */
  public class CallbackEnabledCustomStreamReadWriter extends ServerStreamReadWriter
          implements Runnable
  {
  
      private DataInputStream m_dataInputStream;
      private DataOutputStream m_dataOutputStream;
      private ThreadContext m_messageLoopThread = null;
      private boolean m_isStopped = false;
      private Object m_requestLock = new Object();
      private Object m_replyLock = new Object();
      private Request m_request = null;
      private Reply m_reply = null;
      private CallbackServerClientReadWriter m_callbackServerClientReadWriter;
      private CallbackStreamInvocationHandler m_callbackStreamInvocationHandler;
      private CallbackHostContext m_callbackHostContext;
      private CallbackServerClassFactory m_altrmiFactory;
  
      /*
       * @see ServerStreamReadWriter#initialize()
       */
      protected void initialize() throws IOException
      {
  
          m_dataInputStream = new DataInputStream(getInputStream());
          m_dataOutputStream = new DataOutputStream(new 
BufferedOutputStream(getOutputStream()));
  
          //start the receiving message loop
          m_messageLoopThread = getThreadContextProvider().getThreadContext(this);
          m_messageLoopThread.start();
      }
  
      /**
       * Message Loop .
       * Notifies the Reply or Request monitor depending upon the type of message.
       * @see java.lang.Runnable#run()
       */
      public void run()
      {
  
          m_callbackServerClientReadWriter = new CallbackServerClientReadWriter(this);
          m_callbackStreamInvocationHandler =
                  new 
CallbackStreamInvocationHandler(this.getClass().getClassLoader());
  
          
m_callbackStreamInvocationHandler.setObjectReadWriter(m_callbackServerClientReadWriter);
  
          m_callbackHostContext = new 
CallbackHostContext(m_callbackStreamInvocationHandler);
          m_altrmiFactory = new CallbackServerClassFactory();
  
          m_altrmiFactory.setInitializedHostContext(m_callbackHostContext);
  
          while (!m_isStopped)
          {
  
              ReqRepBytes reqRepBytes = 
ReqRepBytes.getRequestReplyBytesFromDataStream(m_dataInputStream);
  
              if (reqRepBytes.getIOException() != null)
              {
                  IOException ioe = reqRepBytes.getIOException();
                  if (ioe != null)
                  {
                      if ((ioe instanceof EOFException && 
reqRepBytes.ioeDuringReadInt())
                              || ioe instanceof SocketException)
                      {
                          m_isStopped = true;
                          notifyBoth();
                          // restart ?
                          return;
                      }
                  }
  
                  System.out.println("--> ##1 " + ioe.getClass().getName());
              }
  
  
              //interpret it as a request or reply & notify the corresponding listeners
              // SHLD we need a queue here to hold the arriving packets
              // TODO:WORKAROUND: Dont receive until the data is handled some way
              if (reqRepBytes.isRequest())
              {
                  try
                  {
                      Object obj = 
SerializationHelper.getInstanceFromBytes(reqRepBytes.getBytes());
                      m_request = (Request) obj;
                  }
                  catch (ClassNotFoundException e)
                  {
                      System.out.println("--> TODO!!" + e.getMessage());
                      m_request = null;
                      m_isStopped = true;
                      return;
                  }
  
                  synchronized (m_requestLock)
                  {
                      m_requestLock.notify();
                  }
              }
              else // Reply.
              {
                  try
                  {
                      Object obj = 
SerializationHelper.getInstanceFromBytes(reqRepBytes.getBytes());
                      m_reply = (Reply) obj;
                  }
                  catch (ClassNotFoundException e)
                  {
                      m_reply = new ExceptionReply(new 
InvocationException("ClassNotFoundException", e));
                  }
  
                  synchronized (m_replyLock)
                  {
                      m_replyLock.notify();
                  }
              }
          }
      }
  
      private void notifyBoth()
      {
          m_isStopped = true;
  
          synchronized (m_requestLock)
          {
              m_requestLock.notify();
          }
  
          synchronized (m_replyLock)
          {
              m_replyLock.notify();
          }
      }
  
  
      /**
       * Method getRequestFromMessageLoop.
       * @return Request
       */
      private Request getRequestFromMessageLoop()
      {
  
          //if(_replyQueue.size()==0)
          if (m_request == null)
          {
              synchronized (m_requestLock)
              {
                  try
                  {
                      m_requestLock.wait();
                  }
                  catch (InterruptedException e)
                  {
                      e.printStackTrace();
                  }
              }
          }
  
          return m_request;
      }
  
      /**
       * Method getReplyFromMessageLoop.
       * @return Reply
       */
      private Reply getReplyFromMessageLoop()
      {
  
          if (m_reply == null)
          {
              synchronized (m_replyLock)
              {
                  try
                  {
                      m_replyLock.wait();
                  }
                  catch (InterruptedException e)
                  {
                      e.printStackTrace();
                  }
              }
          }
  
          return m_reply;
      }
  
      /*
       * @see ServerStreamReadWriter#writeReplyAndGetRequest(Reply)
       */
      protected Request writeReplyAndGetRequest(Reply altrmiReply)
              throws IOException, ClassNotFoundException
      {
  
          if (altrmiReply != null)
          {
              writeReply(altrmiReply);
          }
  
          Request req = readRequest();
  
          m_request = null;
  
          return req;
      }
  
      /**
       * Method writeReply.
       * @param altrmiReply
       * @throws java.io.IOException
       */
      private void writeReply(Reply altrmiReply) throws IOException
      {
  
          byte[] aBytes = SerializationHelper.getBytesFromInstance(altrmiReply);
  
          m_dataOutputStream.writeInt(aBytes.length);
          m_dataOutputStream.writeBoolean(false);
          m_dataOutputStream.write(aBytes);
          m_dataOutputStream.flush();
      }
  
      /**
       * Method readRequest.
       * @return Request
       */
      private Request readRequest()
      {
  
          Request altrmiRequest = getRequestFromMessageLoop();
  
          if (altrmiRequest instanceof MethodRequest)
          {
              correctArgs(((MethodRequest) altrmiRequest).getArgs());
          }
  
          return altrmiRequest;
      }
  
      //client side
  
      /**
       * Method postRequest.
       * @param altrmiRequest
       * @return Reply
       * @throws java.io.IOException
       * @throws java.lang.ClassNotFoundException
       */
      public Reply postRequest(Request altrmiRequest)
              throws IOException, ClassNotFoundException
      {
  
          if (m_isStopped)
          {
              throw new ConnectionClosedException("Client Closed Connection");
          }
  
          writeRequest(altrmiRequest);
  
          Reply r = getReplyFromMessageLoop();
  
          if (r == null)
          {
              throw new java.io.InterruptedIOException("Client Connection Closed");
          }
  
          m_reply = null;
  
          return r;
      }
  
      /**
       * Method writeRequest.
       * @param altrmiRequest
       * @throws java.io.IOException
       */
      private void writeRequest(Request altrmiRequest) throws IOException
      {
  
          byte[] aBytes = SerializationHelper.getBytesFromInstance(altrmiRequest);
  
          m_dataOutputStream.writeInt(aBytes.length);
          m_dataOutputStream.writeBoolean(true);
          m_dataOutputStream.write(aBytes);
          m_dataOutputStream.flush();
      }
  
      /**
       * Method correctArgs.
       *      UnMarshall the arguments .
       * @param args
       */
      public void correctArgs(Object[] args)
      {
  
          for (int i = 0; i < args.length; i++)
          {
              if (args[i] instanceof ExposedObjectProxy)
              {
                  ExposedObjectProxy exposedObjectProxy = (ExposedObjectProxy) args[i];
  
                  try
                  {
  
                      //lookup the client-side exported object.
                      Object obj = 
m_altrmiFactory.lookup(exposedObjectProxy.getPublishedName());
  
                      args[i] = obj;
                  }
                  catch (Exception altrmiConnectionException)
                  {
                      altrmiConnectionException.printStackTrace();
                  }
              }
          }
      }
  }
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to