This is an automated email from the ASF dual-hosted git repository. amichai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/aries-rsa.git
commit 8795cde0cabf4c56644083dade04cb6f6821ed66 Author: Amichai Rothman <[email protected]> AuthorDate: Thu Sep 14 21:41:26 2023 +0300 ARIES-2119 - Fix TCPServer concurrency --- .../apache/aries/rsa/provider/tcp/TCPServer.java | 54 ++++++++++++---------- 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java index 9702544a..e5fb5b4c 100644 --- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java +++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java @@ -26,12 +26,7 @@ import java.lang.reflect.InvocationTargetException; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import org.apache.aries.rsa.provider.tcp.ser.BasicObjectOutputStream; import org.apache.aries.rsa.provider.tcp.ser.BasicObjectInputStream; @@ -43,7 +38,7 @@ public class TCPServer implements Closeable, Runnable { private Logger log = LoggerFactory.getLogger(TCPServer.class); private ServerSocket serverSocket; private Object service; - private boolean running; + private volatile boolean running; private ExecutorService executor; private MethodInvoker invoker; @@ -57,10 +52,10 @@ public class TCPServer implements Closeable, Runnable { throw new RuntimeException(e); } this.running = true; - this.executor = Executors.newCachedThreadPool(); - for (int c = 0; c < numThreads; c++) { - this.executor.execute(this); - } + numThreads++; // plus one for server socket accepting thread + this.executor = new ThreadPoolExecutor(numThreads, numThreads, + 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); + this.executor.execute(this); // server socket thread } int getPort() { @@ -68,25 +63,34 @@ public class TCPServer implements Closeable, Runnable { } public void run() { - ClassLoader serviceCL = service.getClass().getClassLoader(); while (running) { - try ( - Socket socket = this.serverSocket.accept(); - ObjectInputStream ois = new BasicObjectInputStream(socket.getInputStream(), serviceCL); - ObjectOutputStream objectOutput = new BasicObjectOutputStream(socket.getOutputStream()) - ) { - handleCall(ois, objectOutput); - } catch (SocketException e) { + try { + Socket socket = serverSocket.accept(); + executor.execute(() -> handleConnection(socket)); + } catch (SocketException e) { // server socket is closed running = false; } catch (Exception e) { - log.warn("Error processing service call.", e); + log.warn("Error processing connection", e); } } } - private void handleCall(ObjectInputStream ois, ObjectOutputStream objectOutput) throws Exception { - String methodName = (String)ois.readObject(); - Object[] args = (Object[])ois.readObject(); + private void handleConnection(Socket socket) { + ClassLoader serviceCL = service.getClass().getClassLoader(); + try (Socket sock = socket; + ObjectInputStream in = new BasicObjectInputStream(socket.getInputStream(), serviceCL); + ObjectOutputStream out = new BasicObjectOutputStream(socket.getOutputStream())) { + handleCall(in, out); + } catch (SocketException se) { + return; // e.g. connection closed by client + } catch (Exception e) { + log.warn("Error processing service call", e); + } + } + + private void handleCall(ObjectInputStream in, ObjectOutputStream out) throws Exception { + String methodName = (String)in.readObject(); + Object[] args = (Object[])in.readObject(); Throwable error = null; Object result = null; try { @@ -94,8 +98,8 @@ public class TCPServer implements Closeable, Runnable { } catch (Throwable t) { error = t; } - objectOutput.writeObject(error); - objectOutput.writeObject(result); + out.writeObject(error); + out.writeObject(result); } @SuppressWarnings("unchecked")
