This is an automated email from the ASF dual-hosted git repository. liubao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git
commit 4ccab5518c9553aafaac5c6637b097d7ce0f9176 Author: wujimin <[email protected]> AuthorDate: Mon Mar 12 11:15:42 2018 +0800 SCB-374 invocation from highway transport publish event --- .../transport/highway/HighwayCodec.java | 15 ++------ .../transport/highway/HighwayServer.java | 9 +++-- .../transport/highway/HighwayServerConnection.java | 9 ++++- .../transport/highway/HighwayServerInvoke.java | 39 ++++++++++++--------- .../transport/highway/HighwayServerVerticle.java | 2 +- .../transport/highway/HighwayTransport.java | 2 -- .../transport/highway/TestHighwayCodec.java | 24 +++++++------ .../highway/TestHighwayServerConnection.java | 6 +++- .../transport/highway/TestHighwayServerInvoke.java | 40 ++++++++++++++++++++++ 9 files changed, 97 insertions(+), 49 deletions(-) diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayCodec.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayCodec.java index 525ad57..fc5a148 100644 --- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayCodec.java +++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayCodec.java @@ -20,7 +20,6 @@ package org.apache.servicecomb.transport.highway; import org.apache.servicecomb.codec.protobuf.definition.OperationProtobuf; import org.apache.servicecomb.codec.protobuf.utils.WrapSchema; import org.apache.servicecomb.core.Invocation; -import org.apache.servicecomb.core.invocation.InvocationFactory; import org.apache.servicecomb.foundation.vertx.client.tcp.TcpData; import org.apache.servicecomb.foundation.vertx.tcp.TcpOutputStream; import org.apache.servicecomb.swagger.invocation.Response; @@ -31,15 +30,9 @@ import io.protostuff.runtime.ProtobufFeature; import io.vertx.core.buffer.Buffer; public final class HighwayCodec { - private static HighwayTransport highwayTransport; - private HighwayCodec() { } - public static void setHighwayTransport(HighwayTransport highwayTransport) { - HighwayCodec.highwayTransport = highwayTransport; - } - public static TcpOutputStream encodeRequest(long msgId, Invocation invocation, OperationProtobuf operationProtobuf, ProtobufFeature protobufFeature) throws Exception { // 写header @@ -56,17 +49,13 @@ public final class HighwayCodec { return os; } - public static Invocation decodeRequest(RequestHeader header, OperationProtobuf operationProtobuf, + public static void decodeRequest(Invocation invocation, RequestHeader header, OperationProtobuf operationProtobuf, Buffer bodyBuffer, ProtobufFeature protobufFeature) throws Exception { WrapSchema schema = operationProtobuf.getRequestSchema(); Object[] args = schema.readObject(bodyBuffer, protobufFeature); - Invocation invocation = - InvocationFactory.forProvider(highwayTransport.getEndpoint(), - operationProtobuf.getOperationMeta(), - args); + invocation.setSwaggerArguments(args); invocation.setContext(header.getContext()); - return invocation; } public static RequestHeader readRequestHeader(Buffer headerBuffer, diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServer.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServer.java index c3c0c53..5aca854 100644 --- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServer.java +++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServer.java @@ -17,18 +17,21 @@ package org.apache.servicecomb.transport.highway; +import org.apache.servicecomb.core.Endpoint; import org.apache.servicecomb.foundation.common.net.URIEndpointObject; import org.apache.servicecomb.foundation.vertx.server.TcpServer; import org.apache.servicecomb.foundation.vertx.server.TcpServerConnection; public class HighwayServer extends TcpServer { + private Endpoint endpoint; - public HighwayServer(URIEndpointObject endpointObject) { - super(endpointObject); + public HighwayServer(Endpoint endpoint) { + super((URIEndpointObject) endpoint.getAddress()); + this.endpoint = endpoint; } @Override protected TcpServerConnection createTcpServerConnection() { - return new HighwayServerConnection(); + return new HighwayServerConnection(endpoint); } } diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java index a8d7452..79b5e02 100644 --- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java +++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java @@ -18,6 +18,7 @@ package org.apache.servicecomb.transport.highway; import javax.ws.rs.core.Response.Status; +import org.apache.servicecomb.core.Endpoint; import org.apache.servicecomb.foundation.vertx.server.TcpBufferHandler; import org.apache.servicecomb.foundation.vertx.server.TcpParser; import org.apache.servicecomb.foundation.vertx.server.TcpServerConnection; @@ -35,8 +36,14 @@ import io.vertx.core.net.NetSocket; public class HighwayServerConnection extends TcpServerConnection implements TcpBufferHandler { private static final Logger LOGGER = LoggerFactory.getLogger(HighwayServerConnection.class); + private Endpoint endpoint; + private ProtobufFeature protobufFeature = new ProtobufFeature(); + public HighwayServerConnection(Endpoint endpoint) { + this.endpoint = endpoint; + } + @Override public void init(NetSocket netSocket) { splitter = new TcpParser(this); @@ -115,7 +122,7 @@ public class HighwayServerConnection extends TcpServerConnection implements TcpB } protected void onRequest(long msgId, RequestHeader header, Buffer bodyBuffer) { - HighwayServerInvoke invoke = new HighwayServerInvoke(protobufFeature); + HighwayServerInvoke invoke = new HighwayServerInvoke(endpoint, protobufFeature); if (invoke.init(this, msgId, header, bodyBuffer)) { invoke.execute(); } diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java index 0450187..3dd6006 100644 --- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java +++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java @@ -24,15 +24,14 @@ import org.apache.servicecomb.codec.protobuf.definition.ProtobufManager; import org.apache.servicecomb.codec.protobuf.utils.WrapSchema; import org.apache.servicecomb.core.Const; import org.apache.servicecomb.core.CseContext; +import org.apache.servicecomb.core.Endpoint; import org.apache.servicecomb.core.Invocation; import org.apache.servicecomb.core.definition.MicroserviceMeta; import org.apache.servicecomb.core.definition.MicroserviceMetaManager; import org.apache.servicecomb.core.definition.OperationMeta; import org.apache.servicecomb.core.definition.SchemaMeta; -import org.apache.servicecomb.core.metrics.InvocationStartedEvent; -import org.apache.servicecomb.foundation.common.event.EventBus; +import org.apache.servicecomb.core.invocation.InvocationFactory; import org.apache.servicecomb.foundation.vertx.tcp.TcpConnection; -import org.apache.servicecomb.swagger.invocation.InvocationType; import org.apache.servicecomb.swagger.invocation.Response; import org.apache.servicecomb.swagger.invocation.exception.InvocationException; import org.apache.servicecomb.transport.highway.message.RequestHeader; @@ -62,11 +61,16 @@ public class HighwayServerInvoke { private Buffer bodyBuffer; + private Endpoint endpoint; + + Invocation invocation; + public HighwayServerInvoke() { - this(null); + this(null, null); } - public HighwayServerInvoke(ProtobufFeature protobufFeature) { + public HighwayServerInvoke(Endpoint endpoint, ProtobufFeature protobufFeature) { + this.endpoint = endpoint; this.protobufFeature = protobufFeature; } @@ -107,9 +111,9 @@ public class HighwayServerInvoke { this.bodyBuffer = bodyBuffer; } - private void runInExecutor(InvocationStartedEvent startedEvent) { + private void runInExecutor() { try { - doRunInExecutor(startedEvent); + doRunInExecutor(); } catch (Throwable e) { String msg = String.format("handle request error, %s, msgId=%d", operationMeta.getMicroserviceQualifiedName(), @@ -120,16 +124,14 @@ public class HighwayServerInvoke { } } - private void doRunInExecutor(InvocationStartedEvent startedEvent) throws Exception { - Invocation invocation = HighwayCodec.decodeRequest(header, operationProtobuf, bodyBuffer, protobufFeature); + private void doRunInExecutor() throws Exception { + invocation.onStartExecute(); + + HighwayCodec.decodeRequest(invocation, header, operationProtobuf, bodyBuffer, protobufFeature); invocation.getHandlerContext().put(Const.REMOTE_ADDRESS, this.connection.getNetSocket().remoteAddress()); - //立刻设置开始时间,否则Finished时无法计算TotalTime - invocation.setStartTime(startedEvent.getStartedTime()); - invocation.triggerStartExecutionEvent(); invocation.next(response -> { sendResponse(invocation.getContext(), response); - invocation.triggerFinishedEvent(response.getStatusCode()); }); } @@ -155,6 +157,8 @@ public class HighwayServerInvoke { operationProtobuf.getOperationMeta().getMicroserviceQualifiedName(), msgId); LOGGER.error(msg, e); + } finally { + invocation.onFinish(response); } } @@ -162,9 +166,10 @@ public class HighwayServerInvoke { * start time in queue. */ public void execute() { - InvocationStartedEvent startedEvent = new InvocationStartedEvent(operationMeta.getMicroserviceQualifiedName(), - InvocationType.PRODUCER, System.nanoTime()); - EventBus.getInstance().triggerEvent(startedEvent); - operationMeta.getExecutor().execute(() -> runInExecutor(startedEvent)); + invocation = InvocationFactory.forProvider(endpoint, + operationProtobuf.getOperationMeta(), + null); + invocation.onStart(); + operationMeta.getExecutor().execute(() -> runInExecutor()); } } diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerVerticle.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerVerticle.java index 9c892b8..123f5c7 100644 --- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerVerticle.java +++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerVerticle.java @@ -66,7 +66,7 @@ public class HighwayServerVerticle extends AbstractVerticle { return; } - HighwayServer server = new HighwayServer(endpointObject); + HighwayServer server = new HighwayServer(endpoint); server.init(vertx, SSL_KEY, ar -> { if (ar.succeeded()) { InetSocketAddress socketAddress = ar.result(); diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayTransport.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayTransport.java index 8f197aa..195741f 100644 --- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayTransport.java +++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayTransport.java @@ -43,8 +43,6 @@ public class HighwayTransport extends AbstractTransport { public boolean init() throws Exception { highwayClient.init(transportVertx); - HighwayCodec.setHighwayTransport(this); - DeploymentOptions deployOptions = new DeploymentOptions().setInstances(HighwayConfig.getServerThreadCount()); setListenAddressWithoutSchema(HighwayConfig.getAddress(), Collections.singletonMap(TcpConst.LOGIN, "true")); SimpleJsonObject json = new SimpleJsonObject(); diff --git a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java index d387daf..1d7cd09 100644 --- a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java +++ b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java @@ -19,12 +19,14 @@ package org.apache.servicecomb.transport.highway; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.servicecomb.codec.protobuf.definition.OperationProtobuf; import org.apache.servicecomb.codec.protobuf.utils.WrapSchema; import org.apache.servicecomb.codec.protobuf.utils.schema.NotWrapSchema; +import org.apache.servicecomb.core.Endpoint; import org.apache.servicecomb.core.Invocation; import org.apache.servicecomb.core.definition.OperationMeta; import org.apache.servicecomb.core.definition.SchemaMeta; @@ -51,6 +53,7 @@ import io.protostuff.runtime.ProtobufFeature; import io.vertx.core.buffer.Buffer; import mockit.Mock; import mockit.MockUp; +import mockit.Mocked; public class TestHighwayCodec { @@ -75,7 +78,6 @@ public class TestHighwayCodec { @BeforeClass public static void setupClass() { ProtobufCompatibleUtils.init(); - HighwayCodec.setHighwayTransport(new HighwayTransport()); } @Before @@ -126,17 +128,17 @@ public class TestHighwayCodec { } @Test - public void testDecodeRequest() { - boolean status = true; + public void testDecodeRequest(@Mocked Endpoint endpoint) throws Exception { + commonMock(); + Mockito.when(schemaMeta.getProviderHandlerChain()).thenReturn(Collections.emptyList()); + Object[] args = new Object[] {}; + Mockito.when(schema.readObject(bodyBuffer, null)).thenReturn(args); + + Invocation invocation = new Invocation(endpoint, operationMeta, null); - try { - commonMock(); - Invocation inv = HighwayCodec.decodeRequest(header, operationProtobuf, bodyBuffer, null); - Assert.assertNotNull(inv); - } catch (Exception e) { - status = false; - } - Assert.assertTrue(status); + HighwayCodec.decodeRequest(invocation, header, operationProtobuf, bodyBuffer, null); + + Assert.assertSame(args, invocation.getSwaggerArguments()); } @Test diff --git a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java index 8f6e2d4..e823e4a 100644 --- a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java +++ b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java @@ -24,6 +24,7 @@ import org.apache.servicecomb.codec.protobuf.definition.ProtobufManager; import org.apache.servicecomb.codec.protobuf.utils.ProtobufSchemaUtils; import org.apache.servicecomb.codec.protobuf.utils.WrapSchema; import org.apache.servicecomb.core.CseContext; +import org.apache.servicecomb.core.Endpoint; import org.apache.servicecomb.core.definition.MicroserviceMeta; import org.apache.servicecomb.core.definition.MicroserviceMetaManager; import org.apache.servicecomb.core.definition.OperationMeta; @@ -59,6 +60,9 @@ public class TestHighwayServerConnection { MicroserviceMetaManager microserviceMetaManager; @Mocked + Endpoint endpoint; + + @Mocked NetSocketImpl netSocket; RequestHeader header = new RequestHeader(); @@ -71,7 +75,7 @@ public class TestHighwayServerConnection { result = new SocketAddressImpl(new InetSocketAddress("127.0.0.1", 80)); } }; - connection = new HighwayServerConnection(); + connection = new HighwayServerConnection(endpoint); connection.init(netSocket); header = new RequestHeader(); diff --git a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java index fb9b3b9..0dc902c 100644 --- a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java +++ b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java @@ -17,17 +17,27 @@ package org.apache.servicecomb.transport.highway; +import javax.xml.ws.Holder; + import org.apache.servicecomb.core.definition.OperationMeta; import org.apache.servicecomb.core.definition.SchemaMeta; +import org.apache.servicecomb.core.event.InvocationFinishEvent; +import org.apache.servicecomb.core.event.InvocationStartEvent; import org.apache.servicecomb.core.executor.ReactiveExecutor; import org.apache.servicecomb.core.unittest.UnitTestMeta; +import org.apache.servicecomb.foundation.common.event.EventManager; import org.apache.servicecomb.foundation.vertx.tcp.TcpConnection; import org.apache.servicecomb.transport.common.MockUtil; import org.apache.servicecomb.transport.highway.message.RequestHeader; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; + import io.netty.buffer.ByteBuf; import io.vertx.core.buffer.Buffer; import io.vertx.core.net.NetSocket; @@ -52,6 +62,16 @@ public class TestHighwayServerInvoke { private SocketAddress socketAddress; + @BeforeClass + public static void classSetup() { + EventManager.eventBus = new EventBus(); + } + + @AfterClass + public static void classTeardown() { + EventManager.eventBus = new EventBus(); + } + @Before public void setup() { unitTestMeta = new UnitTestMeta(); @@ -87,6 +107,21 @@ public class TestHighwayServerInvoke { @Test public void test() { + Holder<InvocationStartEvent> startHolder = new Holder<>(); + Holder<InvocationFinishEvent> finishHolder = new Holder<>(); + Object subscriber = new Object() { + @Subscribe + public void onStart(InvocationStartEvent event) { + startHolder.value = event; + } + + @Subscribe + public void onFinish(InvocationFinishEvent event) { + finishHolder.value = event; + } + }; + EventManager.register(subscriber); + MockUtil.getInstance().mockHighwayCodec(); SchemaMeta schemaMeta = unitTestMeta.getOrCreateSchemaMeta(Impl.class); @@ -111,6 +146,11 @@ public class TestHighwayServerInvoke { // exe失败 MockUtil.getInstance().decodeRequestSucc = false; highwayServerInvoke.execute(); + EventManager.unregister(subscriber); + Assert.assertEquals(true, Buffer.buffer(netSocketBuffer).toString().startsWith("CSE.TCP")); + Assert.assertSame(highwayServerInvoke.invocation, startHolder.value.getInvocation()); + Assert.assertSame(highwayServerInvoke.invocation, finishHolder.value.getInvocation()); + Assert.assertTrue(highwayServerInvoke.invocation.getStartExecutionTime() != 0); } } -- To stop receiving notification emails like this one, please contact [email protected].
