[ 
https://issues.apache.org/jira/browse/SCB-690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16577975#comment-16577975
 ] 

ASF GitHub Bot commented on SCB-690:
------------------------------------

zhengyangyong closed pull request #862: [SCB-690] add check if connection is 
still connected before execute invocation
URL: https://github.com/apache/incubator-servicecomb-java-chassis/pull/862
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/java/org/apache/servicecomb/core/Transport.java 
b/core/src/main/java/org/apache/servicecomb/core/Transport.java
index 026ed7c4e..ec90a694c 100644
--- a/core/src/main/java/org/apache/servicecomb/core/Transport.java
+++ b/core/src/main/java/org/apache/servicecomb/core/Transport.java
@@ -17,6 +17,7 @@
 
 package org.apache.servicecomb.core;
 
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.servicecomb.swagger.invocation.AsyncResponse;
@@ -53,4 +54,6 @@ default boolean canInit() {
   void send(Invocation invocation, AsyncResponse asyncResp) throws Exception;
 
   AtomicInteger getConnectedCounter();
+
+  Set<String> getConnectedAddresses();
 }
diff --git 
a/core/src/main/java/org/apache/servicecomb/core/transport/AbstractTransport.java
 
b/core/src/main/java/org/apache/servicecomb/core/transport/AbstractTransport.java
index ff5370afd..d1d3aba3e 100644
--- 
a/core/src/main/java/org/apache/servicecomb/core/transport/AbstractTransport.java
+++ 
b/core/src/main/java/org/apache/servicecomb/core/transport/AbstractTransport.java
@@ -23,6 +23,7 @@
 import java.net.URLDecoder;
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
@@ -42,6 +43,7 @@
 import com.netflix.config.DynamicPropertyFactory;
 
 import io.vertx.core.Vertx;
+import io.vertx.core.impl.ConcurrentHashSet;
 
 public abstract class AbstractTransport implements Transport {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractTransport.class);
@@ -68,6 +70,8 @@
 
   private final AtomicInteger connectedCounter = new AtomicInteger(0);
 
+  private final Set<String> connectedAddresses = new ConcurrentHashSet<>();
+
   @Override
   public Endpoint getPublishEndpoint() {
     return publishEndpoint;
@@ -83,6 +87,11 @@ public AtomicInteger getConnectedCounter() {
     return connectedCounter;
   }
 
+  @Override
+  public Set<String> getConnectedAddresses() {
+    return connectedAddresses;
+  }
+
   protected void setListenAddressWithoutSchema(String addressWithoutSchema) {
     setListenAddressWithoutSchema(addressWithoutSchema, null);
   }
@@ -116,9 +125,9 @@ private String genAddressWithoutSchema(String 
addressWithoutSchema, Map<String,
       addressWithoutSchema += "&";
     }
 
-    String encodedQuery = 
URLEncodedUtils.format(pairs.entrySet().stream().map(entry -> {
-      return new BasicNameValuePair(entry.getKey(), entry.getValue());
-    }).collect(Collectors.toList()), StandardCharsets.UTF_8.name());
+    String encodedQuery = URLEncodedUtils.format(
+        pairs.entrySet().stream().map(entry -> new 
BasicNameValuePair(entry.getKey(), entry.getValue()))
+            .collect(Collectors.toList()), StandardCharsets.UTF_8.name());
 
     if 
(!RegistryUtils.getServiceRegistry().getFeatures().isCanEncodeEndpoint()) {
       addressWithoutSchema = 
genAddressWithoutSchemaForOldSC(addressWithoutSchema, encodedQuery);
diff --git a/core/src/test/java/org/apache/servicecomb/core/TestTransport.java 
b/core/src/test/java/org/apache/servicecomb/core/TestTransport.java
index abfa20252..054905de5 100644
--- a/core/src/test/java/org/apache/servicecomb/core/TestTransport.java
+++ b/core/src/test/java/org/apache/servicecomb/core/TestTransport.java
@@ -21,6 +21,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.servicecomb.core.endpoint.EndpointsCache;
@@ -34,6 +35,7 @@
 import org.junit.Assert;
 import org.junit.Test;
 
+import io.vertx.core.impl.ConcurrentHashSet;
 import mockit.Expectations;
 import mockit.Injectable;
 import mockit.Mocked;
@@ -52,6 +54,11 @@ public AtomicInteger getConnectedCounter() {
         return new AtomicInteger(0);
       }
 
+      @Override
+      public Set<String> getConnectedAddresses() {
+        return new ConcurrentHashSet<>();
+      }
+
       @Override
       public Object parseAddress(String address) {
         return "127.0.0.1";
diff --git 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/server/TcpServer.java
 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/server/TcpServer.java
index 7fc3f24f4..202189f31 100644
--- 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/server/TcpServer.java
+++ 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/server/TcpServer.java
@@ -18,6 +18,7 @@
 package org.apache.servicecomb.foundation.vertx.server;
 
 import java.net.InetSocketAddress;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.servicecomb.foundation.common.event.EventManager;
@@ -42,9 +43,12 @@
 
   private final AtomicInteger connectedCounter;
 
-  public TcpServer(URIEndpointObject endpointObject, AtomicInteger 
connectedCounter) {
+  private final Set<String> connectedAddresses;
+
+  public TcpServer(URIEndpointObject endpointObject, AtomicInteger 
connectedCounter, Set<String> connectedAddresses) {
     this.endpointObject = endpointObject;
     this.connectedCounter = connectedCounter;
+    this.connectedAddresses = connectedAddresses;
   }
 
   public void init(Vertx vertx, String sslKey, 
AsyncResultCallback<InetSocketAddress> callback) {
@@ -76,10 +80,11 @@ public void init(Vertx vertx, String sslKey, 
AsyncResultCallback<InetSocketAddre
         return;
       }
 
+      String address = netSocket.remoteAddress().toString();
+      connectedAddresses.add(address);
       TcpServerConnection connection = createTcpServerConnection();
-      connection.init(netSocket, connectedCounter);
-      EventManager.post(new ClientEvent(netSocket.remoteAddress().toString(),
-          ConnectionEvent.Connected, TransportType.Highway, connectedCount));
+      connection.init(netSocket, connectedCounter, connectedAddresses);
+      EventManager.post(new ClientEvent(address, ConnectionEvent.Connected, 
TransportType.Highway, connectedCount));
     });
 
     InetSocketAddress socketAddress = endpointObject.getSocketAddress();
diff --git 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/server/TcpServerConnection.java
 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/server/TcpServerConnection.java
index 18120a477..8137d77cb 100644
--- 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/server/TcpServerConnection.java
+++ 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/server/TcpServerConnection.java
@@ -16,6 +16,7 @@
  */
 package org.apache.servicecomb.foundation.vertx.server;
 
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.servicecomb.foundation.common.event.EventManager;
@@ -34,7 +35,7 @@
 
   protected TcpParser splitter;
 
-  public void init(NetSocket netSocket, AtomicInteger connectedCounter) {
+  public void init(NetSocket netSocket, AtomicInteger connectedCounter, 
Set<String> connectedAddresses) {
     // currently, socket always be NetSocketImpl
     this.initNetSocket((NetSocketImpl) netSocket);
 
@@ -43,7 +44,7 @@ public void init(NetSocket netSocket, AtomicInteger 
connectedCounter) {
         remoteAddress,
         Thread.currentThread().getName());
     netSocket.exceptionHandler(e -> {
-      LOGGER.error("disconected from {}, in thread {}, cause {}",
+      LOGGER.error("exception from {}, in thread {}, cause {}",
           remoteAddress,
           Thread.currentThread().getName(),
           e.getMessage());
@@ -52,7 +53,7 @@ public void init(NetSocket netSocket, AtomicInteger 
connectedCounter) {
       LOGGER.error("disconected from {}, in thread {}",
           remoteAddress,
           Thread.currentThread().getName());
-
+      connectedAddresses.remove(remoteAddress);
       int connectedCount = connectedCounter.decrementAndGet();
       EventManager.post(new ClientEvent(remoteAddress, ConnectionEvent.Closed, 
TransportType.Highway, connectedCount));
     });
diff --git 
a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/server/TestTcpServer.java
 
b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/server/TestTcpServer.java
index 59825f5c9..3103e3340 100644
--- 
a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/server/TestTcpServer.java
+++ 
b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/server/TestTcpServer.java
@@ -18,6 +18,7 @@
 package org.apache.servicecomb.foundation.vertx.server;
 
 import java.net.InetSocketAddress;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.servicecomb.foundation.common.net.URIEndpointObject;
@@ -26,6 +27,7 @@
 
 import io.vertx.core.Handler;
 import io.vertx.core.Vertx;
+import io.vertx.core.impl.ConcurrentHashSet;
 import io.vertx.core.net.NetServer;
 import io.vertx.core.net.NetServerOptions;
 import io.vertx.core.net.NetSocket;
@@ -35,15 +37,15 @@
 public class TestTcpServer {
   static class TcpServerForTest extends TcpServer {
     public TcpServerForTest(URIEndpointObject endpointObject) {
-      super(endpointObject, new AtomicInteger());
+      super(endpointObject, new AtomicInteger(), new ConcurrentHashSet<>());
     }
 
     @Override
     protected TcpServerConnection createTcpServerConnection() {
       return new TcpServerConnection() {
         @Override
-        public void init(NetSocket netSocket, AtomicInteger connectedCounter) {
-          super.init(netSocket, connectedCounter);
+        public void init(NetSocket netSocket, AtomicInteger connectedCounter, 
Set<String> connectedAddresses) {
+          super.init(netSocket, connectedCounter, connectedAddresses);
         }
       };
     }
diff --git 
a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/server/TestTcpServerConnection.java
 
b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/server/TestTcpServerConnection.java
index 08b39780b..172f5377a 100644
--- 
a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/server/TestTcpServerConnection.java
+++ 
b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/server/TestTcpServerConnection.java
@@ -21,6 +21,7 @@
 import org.junit.Assert;
 import org.junit.Test;
 
+import io.vertx.core.impl.ConcurrentHashSet;
 import io.vertx.core.net.impl.NetSocketImpl;
 import mockit.Mocked;
 
@@ -31,7 +32,7 @@ public void test(@Mocked NetSocketImpl netSocket) {
     connection.setProtocol("p");
     connection.setZipName("z");
 
-    connection.init(netSocket, new AtomicInteger());
+    connection.init(netSocket, new AtomicInteger(), new ConcurrentHashSet<>());
 
     Assert.assertEquals(netSocket, connection.getNetSocket());
   }
diff --git 
a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServer.java
 
b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServer.java
index 0694c2de3..efe299e4c 100644
--- 
a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServer.java
+++ 
b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServer.java
@@ -17,6 +17,7 @@
 
 package org.apache.servicecomb.transport.highway;
 
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.servicecomb.core.Endpoint;
@@ -27,8 +28,8 @@
 public class HighwayServer extends TcpServer {
   private Endpoint endpoint;
 
-  public HighwayServer(Endpoint endpoint, AtomicInteger connectedCounter) {
-    super((URIEndpointObject) endpoint.getAddress(), connectedCounter);
+  public HighwayServer(Endpoint endpoint, AtomicInteger connectedCounter, 
Set<String> connectedAddresses) {
+    super((URIEndpointObject) endpoint.getAddress(), connectedCounter, 
connectedAddresses);
     this.endpoint = endpoint;
   }
 
diff --git 
a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java
 
b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java
index 1f5a7593b..b29c439f8 100644
--- 
a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java
+++ 
b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java
@@ -16,6 +16,7 @@
  */
 package org.apache.servicecomb.transport.highway;
 
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.ws.rs.core.Response.Status;
@@ -42,14 +43,24 @@
 
   private ProtobufFeature protobufFeature = new ProtobufFeature();
 
+  private Set<String> connectedAddresses;
+
+  private NetSocket netSocket;
+
+  public Set<String> getConnectedAddresses() {
+    return connectedAddresses;
+  }
+
   public HighwayServerConnection(Endpoint endpoint) {
     this.endpoint = endpoint;
   }
 
   @Override
-  public void init(NetSocket netSocket, AtomicInteger connectedCounter) {
+  public void init(NetSocket netSocket, AtomicInteger connectedCounter, 
Set<String> connectedAddresses) {
     splitter = new TcpParser(this);
-    super.init(netSocket, connectedCounter);
+    this.netSocket = netSocket;
+    this.connectedAddresses = connectedAddresses;
+    super.init(netSocket, connectedCounter, connectedAddresses);
   }
 
   @Override
@@ -59,21 +70,24 @@ public void handle(long msgId, Buffer headerBuffer, Buffer 
bodyBuffer) {
       return;
     }
 
-    switch (requestHeader.getMsgType()) {
-      case MsgType.REQUEST:
-        onRequest(msgId, requestHeader, bodyBuffer);
-        break;
-      case MsgType.LOGIN:
-        onLogin(msgId, requestHeader, bodyBuffer);
-        break;
-
-      default:
-        throw new Error("Unknown tcp msgType " + requestHeader.getMsgType());
+    //if connection closed no need invoke
+    if (connectedAddresses.contains(netSocket.remoteAddress().toString())) {
+      switch (requestHeader.getMsgType()) {
+        case MsgType.REQUEST:
+          onRequest(msgId, requestHeader, bodyBuffer);
+          break;
+        case MsgType.LOGIN:
+          onLogin(msgId, requestHeader, bodyBuffer);
+          break;
+
+        default:
+          throw new Error("Unknown tcp msgType " + requestHeader.getMsgType());
+      }
     }
   }
 
   protected RequestHeader decodeRequestHeader(long msgId, Buffer headerBuffer) 
{
-    RequestHeader requestHeader = null;
+    RequestHeader requestHeader;
     try {
       requestHeader = HighwayCodec.readRequestHeader(headerBuffer, 
protobufFeature);
     } catch (Exception e) {
@@ -89,7 +103,7 @@ protected RequestHeader decodeRequestHeader(long msgId, 
Buffer headerBuffer) {
   }
 
   protected void onLogin(long msgId, RequestHeader header, Buffer bodyBuffer) {
-    LoginRequest request = null;
+    LoginRequest request;
     try {
       request = LoginRequest.readObject(bodyBuffer);
     } catch (Exception e) {
diff --git 
a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerVerticle.java
 
b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerVerticle.java
index 9a17081d5..e5eb8c9eb 100644
--- 
a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerVerticle.java
+++ 
b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerVerticle.java
@@ -18,6 +18,7 @@
 package org.apache.servicecomb.transport.highway;
 
 import java.net.InetSocketAddress;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.servicecomb.core.Const;
@@ -44,12 +45,16 @@
 
   private final AtomicInteger connectedCounter;
 
+  private final Set<String> connectedAddresses;
+
   public HighwayServerVerticle() {
-    
this(CseContext.getInstance().getTransportManager().findTransport(Const.HIGHWAY).getConnectedCounter());
+    
this(CseContext.getInstance().getTransportManager().findTransport(Const.HIGHWAY).getConnectedCounter(),
+        
CseContext.getInstance().getTransportManager().findTransport(Const.HIGHWAY).getConnectedAddresses());
   }
 
-  public HighwayServerVerticle(AtomicInteger connectedCounter) {
+  public HighwayServerVerticle(AtomicInteger connectedCounter, Set<String> 
connectedAddresses) {
     this.connectedCounter = connectedCounter;
+    this.connectedAddresses = connectedAddresses;
   }
 
   @Override
@@ -79,7 +84,7 @@ protected void startListen(Future<Void> startFuture) {
       return;
     }
 
-    HighwayServer server = new HighwayServer(endpoint, connectedCounter);
+    HighwayServer server = new HighwayServer(endpoint, connectedCounter, 
connectedAddresses);
     server.init(vertx, SSL_KEY, ar -> {
       if (ar.succeeded()) {
         InetSocketAddress socketAddress = ar.result();
diff --git 
a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java
 
b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java
index 4f49fbc6f..82cb4502e 100644
--- 
a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java
+++ 
b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java
@@ -39,6 +39,7 @@
 import io.protostuff.LinkedBuffer;
 import io.protostuff.ProtobufOutput;
 import io.vertx.core.buffer.Buffer;
+import io.vertx.core.impl.ConcurrentHashSet;
 import io.vertx.core.net.NetSocket;
 import io.vertx.core.net.impl.NetSocketImpl;
 import io.vertx.core.net.impl.SocketAddressImpl;
@@ -76,7 +77,7 @@ public void init() {
       }
     };
     connection = new HighwayServerConnection(endpoint);
-    connection.init(netSocket, new AtomicInteger());
+    connection.init(netSocket, new AtomicInteger(), new ConcurrentHashSet<>());
 
     header = new RequestHeader();
   }
@@ -87,6 +88,7 @@ public void testInvalidMsgType() throws Exception {
     Buffer headerBuffer = createBuffer(requestHeaderSchema, header);
 
     try {
+      
connection.getConnectedAddresses().add(connection.getNetSocket().remoteAddress().toString());
       connection.handle(0, headerBuffer, null);
       throw new Error("must error");
     } catch (Throwable e) {
@@ -94,6 +96,7 @@ public void testInvalidMsgType() throws Exception {
     }
   }
 
+
   @Test
   public void testReqeustHeaderError() throws Exception {
     header.setMsgType(MsgType.LOGIN);
@@ -101,10 +104,11 @@ public void testReqeustHeaderError() throws Exception {
 
     headerBuffer.setByte(0, (byte) 100);
 
+    
connection.getConnectedAddresses().add(connection.getNetSocket().remoteAddress().toString());
     connection.handle(0, headerBuffer, null);
 
-    Assert.assertEquals(null, connection.getProtocol());
-    Assert.assertEquals(null, connection.getZipName());
+    Assert.assertNull(connection.getProtocol());
+    Assert.assertNull(connection.getZipName());
   }
 
   @Test
@@ -117,6 +121,7 @@ public void testSetParameterNormal() throws Exception {
     body.setZipName("z");
     Buffer bodyBuffer = createBuffer(setParameterRequestSchema, body);
 
+    
connection.getConnectedAddresses().add(connection.getNetSocket().remoteAddress().toString());
     connection.handle(0, headerBuffer, bodyBuffer);
 
     Assert.assertEquals("p", connection.getProtocol());
@@ -136,8 +141,8 @@ public void testSetParameterError() throws Exception {
 
     connection.handle(0, headerBuffer, bodyBuffer);
 
-    Assert.assertEquals(null, connection.getProtocol());
-    Assert.assertEquals(null, connection.getZipName());
+    Assert.assertNull(connection.getProtocol());
+    Assert.assertNull(connection.getZipName());
   }
 
   @Test
@@ -182,13 +187,40 @@ public void execute() {
       }
     };
 
+    
connection.getConnectedAddresses().add(connection.getNetSocket().remoteAddress().toString());
     connection.handle(0, headerBuffer, bodyBuffer);
 
-    Assert.assertEquals(null, connection.getProtocol());
-    Assert.assertEquals(null, connection.getZipName());
+    Assert.assertNull(connection.getProtocol());
+    Assert.assertNull(connection.getZipName());
     Assert.assertEquals(true, holder.value);
   }
 
+  @Test
+  public void testConnectionClosed(@Mocked MicroserviceMeta microserviceMeta, 
@Mocked OperationMeta operationMeta,
+      @Mocked SchemaMeta schemaMeta) throws Exception {
+    header.setMsgType(MsgType.REQUEST);
+    Buffer headerBuffer = createBuffer(requestHeaderSchema, header);
+
+    Buffer bodyBuffer = Buffer.buffer();
+
+    Holder<Boolean> holder = new Holder<>();
+    new MockUp<HighwayServerInvoke>() {
+      @Mock
+      public boolean init(NetSocket netSocket, long msgId,
+          RequestHeader header, Buffer bodyBuffer) {
+        return true;
+      }
+
+      @Mock
+      public void execute() {
+        holder.value = true;
+      }
+    };
+
+    connection.handle(0, headerBuffer, bodyBuffer);
+    Assert.assertNull(holder.value);
+  }
+
   @Test
   public void testRequestError() throws Exception {
     header.setMsgType(MsgType.REQUEST);
@@ -207,8 +239,8 @@ public boolean init(NetSocket netSocket, long msgId,
 
     connection.handle(0, headerBuffer, bodyBuffer);
 
-    Assert.assertEquals(null, connection.getProtocol());
-    Assert.assertEquals(null, connection.getZipName());
+    Assert.assertNull(connection.getProtocol());
+    Assert.assertNull(connection.getZipName());
     Assert.assertEquals(false, holder.value);
   }
 
diff --git 
a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayVerticle.java
 
b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayVerticle.java
index 95019d8cc..71a67db85 100644
--- 
a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayVerticle.java
+++ 
b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayVerticle.java
@@ -32,6 +32,7 @@
 import io.vertx.core.Context;
 import io.vertx.core.Future;
 import io.vertx.core.Vertx;
+import io.vertx.core.impl.ConcurrentHashSet;
 import io.vertx.core.json.JsonObject;
 import mockit.Expectations;
 import mockit.Mocked;
@@ -40,7 +41,7 @@
   @Test
   public void testHighwayVerticle(@Mocked Transport transport, @Mocked Vertx 
vertx, @Mocked Context context,
       @Mocked JsonObject json) {
-    HighwayServerVerticle highwayVerticle = new HighwayServerVerticle(new 
AtomicInteger());
+    HighwayServerVerticle highwayVerticle = new HighwayServerVerticle(new 
AtomicInteger(), new ConcurrentHashSet<>());
     URIEndpointObject endpiontObject = new 
URIEndpointObject("highway://127.0.0.1:9090");
     new Expectations() {
       {
diff --git 
a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestServerVerticle.java
 
b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestServerVerticle.java
index 53c3890ee..d362c3954 100644
--- 
a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestServerVerticle.java
+++ 
b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestServerVerticle.java
@@ -63,12 +63,16 @@
 
   private final AtomicInteger connectedCounter;
 
+  private final Set<String> connectedAddresses;
+
   public RestServerVerticle() {
-    
this(CseContext.getInstance().getTransportManager().findTransport(Const.RESTFUL).getConnectedCounter());
+    
this(CseContext.getInstance().getTransportManager().findTransport(Const.RESTFUL).getConnectedCounter(),
+        
CseContext.getInstance().getTransportManager().findTransport(Const.RESTFUL).getConnectedAddresses());
   }
 
-  public RestServerVerticle(AtomicInteger connectedCounter) {
+  public RestServerVerticle(AtomicInteger connectedCounter, Set<String> 
connectedAddresses) {
     this.connectedCounter = connectedCounter;
+    this.connectedAddresses = connectedAddresses;
   }
 
   @Override
@@ -95,6 +99,7 @@ public void start(Future<Void> startFuture) throws Exception {
       HttpServer httpServer = createHttpServer();
       httpServer.requestHandler(mainRouter::accept);
       httpServer.connectionHandler(connection -> {
+        String address = connection.remoteAddress().toString();
         int connectedCount = connectedCounter.incrementAndGet();
         int connectionLimit = DynamicPropertyFactory.getInstance()
             .getIntProperty("servicecomb.rest.server.connection-limit", 
Integer.MAX_VALUE).get();
@@ -102,10 +107,13 @@ public void start(Future<Void> startFuture) throws 
Exception {
           connectedCounter.decrementAndGet();
           connection.close();
         } else {
-          EventManager.post(new 
ClientEvent(connection.remoteAddress().toString(),
-              ConnectionEvent.Connected, TransportType.Rest, connectedCount));
-          connection.closeHandler(event -> EventManager.post(new 
ClientEvent(connection.remoteAddress().toString(),
-              ConnectionEvent.Closed, TransportType.Rest, 
connectedCounter.decrementAndGet())));
+          connectedAddresses.add(address);
+          EventManager.post(new ClientEvent(address, 
ConnectionEvent.Connected, TransportType.Rest, connectedCount));
+          connection.closeHandler(event -> {
+            connectedAddresses.remove(address);
+            EventManager.post(new ClientEvent(address,
+                ConnectionEvent.Closed, TransportType.Rest, 
connectedCounter.decrementAndGet()));
+          });
         }
       });
 
diff --git 
a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/VertxRestDispatcher.java
 
b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/VertxRestDispatcher.java
index 20f70bd96..443915e5f 100644
--- 
a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/VertxRestDispatcher.java
+++ 
b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/VertxRestDispatcher.java
@@ -73,9 +73,9 @@ private void failureHandler(RoutingContext context) {
 
     AbstractRestInvocation restProducerInvocation = 
context.get(RestConst.REST_PRODUCER_INVOCATION);
     Throwable e = context.failure();
-    if (ErrorDataDecoderException.class.isInstance(e)) {
+    if (e instanceof ErrorDataDecoderException) {
       Throwable cause = e.getCause();
-      if (InvocationException.class.isInstance(cause)) {
+      if (cause instanceof InvocationException) {
         e = cause;
       }
     }
@@ -129,7 +129,7 @@ private void 
sendFailureRespDeterminedByStatus(RoutingContext context) {
    * Use routingContext to send failure information in throwable.
    */
   private void sendExceptionByRoutingContext(RoutingContext context, Throwable 
e) {
-    if (InvocationException.class.isInstance(e)) {
+    if (e instanceof InvocationException) {
       InvocationException invocationException = (InvocationException) e;
       context.response().putHeader(HttpHeaders.CONTENT_TYPE, 
MediaType.WILDCARD)
           
.setStatusCode(invocationException.getStatusCode()).setStatusMessage(invocationException.getReasonPhrase())
@@ -189,8 +189,11 @@ private void onRequest(RoutingContext context) {
     HttpServletRequestEx requestEx = new 
VertxServerRequestToHttpServletRequest(context);
     HttpServletResponseEx responseEx = new 
VertxServerResponseToHttpServletResponse(context.response());
 
-    VertxRestInvocation vertxRestInvocation = new VertxRestInvocation();
-    context.put(RestConst.REST_PRODUCER_INVOCATION, vertxRestInvocation);
-    vertxRestInvocation.invoke(transport, requestEx, responseEx, 
httpServerFilters);
+    //if connection closed no need invoke
+    if 
(transport.getConnectedAddresses().contains(context.request().remoteAddress().toString()))
 {
+      VertxRestInvocation vertxRestInvocation = new VertxRestInvocation();
+      context.put(RestConst.REST_PRODUCER_INVOCATION, vertxRestInvocation);
+      vertxRestInvocation.invoke(transport, requestEx, responseEx, 
httpServerFilters);
+    }
   }
 }
diff --git 
a/transports/transport-rest/transport-rest-vertx/src/test/java/org/apache/servicecomb/transport/rest/vertx/TestRestServerVerticle.java
 
b/transports/transport-rest/transport-rest-vertx/src/test/java/org/apache/servicecomb/transport/rest/vertx/TestRestServerVerticle.java
index fb7a13cfe..62e252f49 100644
--- 
a/transports/transport-rest/transport-rest-vertx/src/test/java/org/apache/servicecomb/transport/rest/vertx/TestRestServerVerticle.java
+++ 
b/transports/transport-rest/transport-rest-vertx/src/test/java/org/apache/servicecomb/transport/rest/vertx/TestRestServerVerticle.java
@@ -39,6 +39,7 @@
 import io.vertx.core.Future;
 import io.vertx.core.Vertx;
 import io.vertx.core.http.HttpMethod;
+import io.vertx.core.impl.ConcurrentHashSet;
 import io.vertx.core.json.JsonObject;
 import io.vertx.ext.web.Route;
 import io.vertx.ext.web.Router;
@@ -57,7 +58,7 @@
 
   @Before
   public void setUp() {
-    instance = new RestServerVerticle(new AtomicInteger());
+    instance = new RestServerVerticle(new AtomicInteger(), new 
ConcurrentHashSet<>());
     startFuture = Future.future();
 
     CseContext.getInstance().setTransportManager(new TransportManager());
@@ -90,7 +91,7 @@ public void testRestServerVerticleWithRouter(@Mocked 
Transport transport, @Mocke
         result = endpiont;
       }
     };
-    RestServerVerticle server = new RestServerVerticle(new AtomicInteger());
+    RestServerVerticle server = new RestServerVerticle(new AtomicInteger(), 
new ConcurrentHashSet<>());
     // process stuff done by Expectations
     server.init(vertx, context);
     server.start(startFuture);
@@ -117,7 +118,7 @@ public void testRestServerVerticleWithRouterSSL(@Mocked 
Transport transport, @Mo
         result = endpiont;
       }
     };
-    RestServerVerticle server = new RestServerVerticle(new AtomicInteger());
+    RestServerVerticle server = new RestServerVerticle(new AtomicInteger(), 
new ConcurrentHashSet<>());
     // process stuff done by Expectations
     server.init(vertx, context);
     server.start(startFuture);
@@ -144,7 +145,7 @@ public void testRestServerVerticleWithHttp2(@Mocked 
Transport transport, @Mocked
         result = endpiont;
       }
     };
-    RestServerVerticle server = new RestServerVerticle(new AtomicInteger());
+    RestServerVerticle server = new RestServerVerticle(new AtomicInteger(), 
new ConcurrentHashSet<>());
     boolean status = false;
     try {
       server.init(vertx, context);
@@ -240,7 +241,7 @@ CorsHandler getCorsHandler(String corsAllowedOrigin) {
     Router router = Mockito.mock(Router.class);
     Mockito.when(router.route()).thenReturn(Mockito.mock(Route.class));
 
-    RestServerVerticle server = new RestServerVerticle(new AtomicInteger());
+    RestServerVerticle server = new RestServerVerticle(new AtomicInteger(), 
new ConcurrentHashSet<>());
 
     Deencapsulation.invoke(server, "mountCorsHandler", router);
     Assert.assertEquals(7, counter.get());
diff --git 
a/transports/transport-rest/transport-rest-vertx/src/test/java/org/apache/servicecomb/transport/rest/vertx/TestVertxRestDispatcher.java
 
b/transports/transport-rest/transport-rest-vertx/src/test/java/org/apache/servicecomb/transport/rest/vertx/TestVertxRestDispatcher.java
index f02ee0ebb..9bb11ddc1 100644
--- 
a/transports/transport-rest/transport-rest-vertx/src/test/java/org/apache/servicecomb/transport/rest/vertx/TestVertxRestDispatcher.java
+++ 
b/transports/transport-rest/transport-rest-vertx/src/test/java/org/apache/servicecomb/transport/rest/vertx/TestVertxRestDispatcher.java
@@ -17,6 +17,8 @@
 
 package org.apache.servicecomb.transport.rest.vertx;
 
+import static org.mockito.Mockito.when;
+
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -29,6 +31,7 @@
 import org.apache.servicecomb.common.rest.RestProducerInvocation;
 import org.apache.servicecomb.common.rest.VertxRestInvocation;
 import org.apache.servicecomb.common.rest.filter.HttpServerFilter;
+import org.apache.servicecomb.core.Const;
 import org.apache.servicecomb.core.CseContext;
 import org.apache.servicecomb.core.Transport;
 import org.apache.servicecomb.core.transport.TransportManager;
@@ -40,6 +43,7 @@
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import 
io.netty.handler.codec.http.multipart.HttpPostRequestDecoder.ErrorDataDecoderException;
 import io.vertx.core.AsyncResult;
@@ -50,6 +54,7 @@
 import io.vertx.core.http.HttpMethod;
 import io.vertx.core.http.HttpServerRequest;
 import io.vertx.core.http.HttpServerResponse;
+import io.vertx.core.impl.ConcurrentHashSet;
 import io.vertx.core.impl.VertxImpl;
 import io.vertx.core.json.JsonObject;
 import io.vertx.core.net.SocketAddress;
@@ -65,9 +70,6 @@
   @Mocked
   Router mainRouter;
 
-  @Mocked
-  TransportManager transportManager;
-
   VertxRestDispatcher dispatcher;
 
   Throwable throwable;
@@ -92,6 +94,15 @@ void invoke(Transport transport, HttpServletRequestEx 
requestEx, HttpServletResp
       }
     };
 
+    Transport t = Mockito.mock(Transport.class);
+    when(t.getName()).thenReturn(Const.RESTFUL);
+
+    when(t.getConnectedAddresses()).thenReturn(new ConcurrentHashSet<>());
+
+    TransportManager transportManager = Mockito.mock(TransportManager.class);
+
+    when(transportManager.findTransport(Const.RESTFUL)).thenReturn(t);
+
     CseContext.getInstance().setTransportManager(transportManager);
   }
 
@@ -298,12 +309,43 @@ HttpServerRequest request() {
         result = context;
       }
     };
+
+    
CseContext.getInstance().getTransportManager().findTransport(org.apache.servicecomb.core.Const.RESTFUL)
+        
.getConnectedAddresses().add(request.connection().remoteAddress().toString());
     Deencapsulation.invoke(dispatcher, "onRequest", routingContext);
 
     Assert.assertEquals(VertxRestInvocation.class, 
map.get(RestConst.REST_PRODUCER_INVOCATION).getClass());
     Assert.assertTrue(invoked);
   }
 
+  @Test
+  public void onRequestButConnectionClosed(@Mocked Context context, @Mocked 
HttpServerRequest request,
+      @Mocked SocketAddress socketAdrress) {
+    Map<String, Object> map = new HashMap<>();
+    RoutingContext routingContext = new MockUp<RoutingContext>() {
+      @Mock
+      RoutingContext put(String key, Object obj) {
+        map.put(key, obj);
+        return null;
+      }
+
+      @Mock
+      HttpServerRequest request() {
+        return request;
+      }
+    }.getMockInstance();
+
+    new Expectations(VertxImpl.class) {
+      {
+        VertxImpl.context();
+        result = context;
+      }
+    };
+
+    Deencapsulation.invoke(dispatcher, "onRequest", routingContext);
+    Assert.assertFalse(invoked);
+  }
+
   @Test
   public void testWrapResponseBody() {
     VertxRestDispatcher vertxRestDispatcher = new VertxRestDispatcher();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> when producer invocation prepare to run in executor, make sure not 
> disconnect/ not time out
> -------------------------------------------------------------------------------------------
>
>                 Key: SCB-690
>                 URL: https://issues.apache.org/jira/browse/SCB-690
>             Project: Apache ServiceComb
>          Issue Type: Sub-task
>          Components: Java-Chassis
>            Reporter: wujimin
>            Assignee: yangyongzheng
>            Priority: Major
>
> vertx rest and highway not control producer timeout
>  
> note: if already running, then could do nothing, just let it go till finished.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to