This is an automated email from the ASF dual-hosted git repository.

amichai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/aries-rsa.git

commit 8795cde0cabf4c56644083dade04cb6f6821ed66
Author: Amichai Rothman <[email protected]>
AuthorDate: Thu Sep 14 21:41:26 2023 +0300

    ARIES-2119 - Fix TCPServer concurrency
---
 .../apache/aries/rsa/provider/tcp/TCPServer.java   | 54 ++++++++++++----------
 1 file changed, 29 insertions(+), 25 deletions(-)

diff --git 
a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java 
b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java
index 9702544a..e5fb5b4c 100644
--- 
a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java
+++ 
b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java
@@ -26,12 +26,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketException;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 import org.apache.aries.rsa.provider.tcp.ser.BasicObjectOutputStream;
 import org.apache.aries.rsa.provider.tcp.ser.BasicObjectInputStream;
@@ -43,7 +38,7 @@ public class TCPServer implements Closeable, Runnable {
     private Logger log = LoggerFactory.getLogger(TCPServer.class);
     private ServerSocket serverSocket;
     private Object service;
-    private boolean running;
+    private volatile boolean running;
     private ExecutorService executor;
     private MethodInvoker invoker;
 
@@ -57,10 +52,10 @@ public class TCPServer implements Closeable, Runnable {
             throw new RuntimeException(e);
         }
         this.running = true;
-        this.executor = Executors.newCachedThreadPool();
-        for (int c = 0; c < numThreads; c++) {
-            this.executor.execute(this);
-        }
+        numThreads++; // plus one for server socket accepting thread
+        this.executor = new ThreadPoolExecutor(numThreads, numThreads,
+            60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
+        this.executor.execute(this); // server socket thread
     }
 
     int getPort() {
@@ -68,25 +63,34 @@ public class TCPServer implements Closeable, Runnable {
     }
 
     public void run() {
-        ClassLoader serviceCL = service.getClass().getClassLoader();
         while (running) {
-            try (
-                    Socket socket = this.serverSocket.accept();
-                    ObjectInputStream ois = new 
BasicObjectInputStream(socket.getInputStream(), serviceCL);
-                    ObjectOutputStream objectOutput = new 
BasicObjectOutputStream(socket.getOutputStream())
-                ) {
-                handleCall(ois, objectOutput);
-            } catch (SocketException e) {
+            try {
+                Socket socket = serverSocket.accept();
+                executor.execute(() -> handleConnection(socket));
+            } catch (SocketException e) { // server socket is closed
                 running = false;
             } catch (Exception e) {
-                log.warn("Error processing service call.", e);
+                log.warn("Error processing connection", e);
             }
         }
     }
 
-    private void handleCall(ObjectInputStream ois, ObjectOutputStream 
objectOutput) throws Exception {
-        String methodName = (String)ois.readObject();
-        Object[] args = (Object[])ois.readObject();
+    private void handleConnection(Socket socket) {
+        ClassLoader serviceCL = service.getClass().getClassLoader();
+        try (Socket sock = socket;
+             ObjectInputStream in = new 
BasicObjectInputStream(socket.getInputStream(), serviceCL);
+             ObjectOutputStream out = new 
BasicObjectOutputStream(socket.getOutputStream())) {
+            handleCall(in, out);
+        } catch (SocketException se) {
+            return; // e.g. connection closed by client
+        } catch (Exception e) {
+            log.warn("Error processing service call", e);
+        }
+    }
+
+    private void handleCall(ObjectInputStream in, ObjectOutputStream out) 
throws Exception {
+        String methodName = (String)in.readObject();
+        Object[] args = (Object[])in.readObject();
         Throwable error = null;
         Object result = null;
         try {
@@ -94,8 +98,8 @@ public class TCPServer implements Closeable, Runnable {
         } catch (Throwable t) {
             error = t;
         }
-        objectOutput.writeObject(error);
-        objectOutput.writeObject(result);
+        out.writeObject(error);
+        out.writeObject(result);
     }
 
     @SuppressWarnings("unchecked")

Reply via email to