This is an automated email from the ASF dual-hosted git repository. tv pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/commons-jcs.git
commit 4653fbc34e7f67c9f99f4a3c10dee8942bd6b068 Author: Thomas Vandahl <t...@apache.org> AuthorDate: Fri Mar 26 18:38:16 2021 +0100 Use IElementSerializer --- .../lateral/socket/tcp/LateralTCPSender.java | 104 ++++++++++----------- 1 file changed, 49 insertions(+), 55 deletions(-) diff --git a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPSender.java b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPSender.java index bb75195..b73bf1f 100644 --- a/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPSender.java +++ b/commons-jcs-core/src/main/java/org/apache/commons/jcs3/auxiliary/lateral/socket/tcp/LateralTCPSender.java @@ -20,16 +20,19 @@ package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp; */ import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; +import java.io.InputStream; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.jcs3.auxiliary.lateral.LateralElementDescriptor; import org.apache.commons.jcs3.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes; -import org.apache.commons.jcs3.io.ObjectInputStreamClassLoaderAware; +import org.apache.commons.jcs3.engine.behavior.IElementSerializer; import org.apache.commons.jcs3.log.Log; import org.apache.commons.jcs3.log.LogManager; +import org.apache.commons.jcs3.utils.serialization.StandardSerializer; /** * This class is based on the log4j SocketAppender class. I'm using a different repair structure, so @@ -44,8 +47,8 @@ public class LateralTCPSender private final int socketOpenTimeOut; private final int socketSoTimeOut; - /** The stream from the server connection. */ - private ObjectOutputStream oos; + /** The serializer. */ + private IElementSerializer serializer; /** The socket connection with the server. */ private Socket socket; @@ -54,7 +57,7 @@ public class LateralTCPSender private int sendCnt; /** Use to synchronize multiple threads that may be trying to get. */ - private final Object getLock = new int[0]; + private final Lock lock = new ReentrantLock(true); /** * Constructor for the LateralTCPSender object. @@ -68,14 +71,23 @@ public class LateralTCPSender this.socketOpenTimeOut = lca.getOpenTimeOut(); this.socketSoTimeOut = lca.getSocketTimeOut(); + this.serializer = new StandardSerializer(); + final String p1 = lca.getTcpServer(); if ( p1 == null ) { throw new IOException( "Invalid server (null)" ); } - final String h2 = p1.substring( 0, p1.indexOf( ":" ) ); - final int po = Integer.parseInt( p1.substring( p1.indexOf( ":" ) + 1 ) ); + final int colonPosition = p1.lastIndexOf(':'); + + if ( colonPosition < 0 ) + { + throw new IOException( "Invalid address [" + p1 + "]" ); + } + + final String h2 = p1.substring( 0, colonPosition ); + final int po = Integer.parseInt( p1.substring( colonPosition + 1 ) ); log.debug( "h2 = {0}, po = {1}", h2, po ); if ( h2.isEmpty() ) @@ -100,27 +112,17 @@ public class LateralTCPSender { log.info( "Attempting connection to [{0}]", host ); - // have time out socket open do this for us try { - socket = new Socket(); - socket.connect( new InetSocketAddress( host, port ), this.socketOpenTimeOut ); + this.socket = new Socket(); + this.socket.connect(new InetSocketAddress(host, port), this.socketOpenTimeOut); } catch ( final IOException ioe ) { - if (socket != null) - { - socket.close(); - } - throw new IOException( "Cannot connect to " + host + ":" + port, ioe ); } socket.setSoTimeout( socketSoTimeOut ); - synchronized ( this ) - { - oos = new ObjectOutputStream( socket.getOutputStream() ); - } } catch ( final java.net.ConnectException e ) { @@ -156,15 +158,17 @@ public class LateralTCPSender return; } - if ( oos == null ) + final OutputStream sos = socket.getOutputStream(); + + lock.lock(); + try { - throw new IOException( "No remote connection is available for LateralTCPSender." ); + serializer.serializeTo(led, sos); + sos.flush(); } - - synchronized ( this.getLock ) + finally { - oos.writeUnshared( led ); - oos.flush(); + lock.unlock(); } } @@ -186,20 +190,16 @@ public class LateralTCPSender return null; } - if ( oos == null ) - { - throw new IOException( "No remote connection is available for LateralTCPSender." ); - } - - Object response = null; - // Synchronized to insure that the get requests to server from this // sender and the responses are processed in order, else you could // return the wrong item from the cache. // This is a big block of code. May need to re-think this strategy. // This may not be necessary. // Normal puts, etc to laterals do not have to be synchronized. - synchronized ( this.getLock ) + Object response = null; + + lock.lock(); + try { try { @@ -216,26 +216,22 @@ public class LateralTCPSender } // write object to listener - oos.writeUnshared( led ); - oos.flush(); + send(led); - try (ObjectInputStream ois = new ObjectInputStreamClassLoaderAware( socket.getInputStream(), null )) - { - socket.setSoTimeout( socketSoTimeOut ); - response = ois.readObject(); - } - catch ( final IOException ioe ) - { - final String message = "Could not open ObjectInputStream to " + socket + - " SoTimeout [" + socket.getSoTimeout() + - "] Connected [" + socket.isConnected() + "]"; - log.error( message, ioe ); - throw ioe; - } - catch ( final Exception e ) - { - log.error( e ); - } + InputStream sis = socket.getInputStream(); + response = serializer.deSerializeFrom(sis, null); + } + catch ( final IOException | ClassNotFoundException ioe ) + { + final String message = "Could not open InputStream to " + socket + + " SoTimeout [" + socket.getSoTimeout() + + "] Connected [" + socket.isConnected() + "]"; + log.error( message, ioe ); + throw new IOException(message, ioe); + } + finally + { + lock.unlock(); } return response; @@ -252,8 +248,6 @@ public class LateralTCPSender throws IOException { log.info( "Dispose called" ); - // WILL CLOSE CONNECTION USED BY ALL - oos.close(); socket.close(); } }