This is an automated email from the ASF dual-hosted git repository. liubao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git
commit 78ab2c4f93b3ea0ed16fe6e4a1d54fc3f9191856 Author: wujimin <[email protected]> AuthorDate: Sun Apr 22 04:22:15 2018 +0800 [SCB-486] edge server related client connection use the same context, so that file download can use pump logic. --- .../servicecomb/edge/core/EdgeBootListener.java | 5 ++++ .../servicecomb/edge/core/EdgeInvocation.java | 4 +++ .../edge/core/EdgeRestServerVerticle.java | 31 ++++++--------------- .../edge/core/EdgeRestTransportClient.java | 32 +++++++--------------- .../servicecomb/edge/core/TestEdgeInvocation.java | 1 + .../transport/rest/client/RestTransportClient.java | 14 ++++++++-- .../rest/client/RestTransportClientManager.java | 3 +- .../rest/client/TransportClientConfig.java | 16 +++++++++-- .../transport/rest/vertx/TransportConfig.java | 12 ++++++++ .../transport/rest/vertx/VertxRestTransport.java | 3 +- 10 files changed, 70 insertions(+), 51 deletions(-) diff --git a/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeBootListener.java b/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeBootListener.java index 1762980..55ee392 100644 --- a/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeBootListener.java +++ b/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeBootListener.java @@ -20,6 +20,8 @@ package org.apache.servicecomb.edge.core; import org.apache.commons.configuration.Configuration; import org.apache.servicecomb.core.BootListener; import org.apache.servicecomb.core.executor.ExecutorManager; +import org.apache.servicecomb.transport.rest.client.TransportClientConfig; +import org.apache.servicecomb.transport.rest.vertx.TransportConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -36,6 +38,9 @@ public class EdgeBootListener implements BootListener { return; } + TransportClientConfig.setRestTransportClientCls(EdgeRestTransportClient.class); + TransportConfig.setRestServerVerticle(EdgeRestServerVerticle.class); + String defaultExecutor = DynamicPropertyFactory.getInstance() .getStringProperty(ExecutorManager.KEY_EXECUTORS_DEFAULT, null) .get(); diff --git a/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeInvocation.java b/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeInvocation.java index 71a9f65..4991443 100644 --- a/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeInvocation.java +++ b/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeInvocation.java @@ -36,9 +36,12 @@ import org.apache.servicecomb.serviceregistry.RegistryUtils; import org.apache.servicecomb.serviceregistry.consumer.MicroserviceVersionRule; import org.apache.servicecomb.serviceregistry.definition.DefinitionConst; +import io.vertx.core.Vertx; import io.vertx.ext.web.RoutingContext; public class EdgeInvocation extends AbstractRestInvocation { + public static final String EDGE_INVOCATION_CONTEXT = "edgeInvocationContext"; + protected String microserviceName; protected MicroserviceVersionRule microserviceVersionRule; @@ -122,6 +125,7 @@ public class EdgeInvocation extends AbstractRestInvocation { restOperationMeta.getOperationMeta(), null); this.invocation.setSync(false); + this.invocation.getHandlerContext().put(EDGE_INVOCATION_CONTEXT, Vertx.currentContext()); this.invocation.setResponseExecutor(new ReactiveResponseExecutor()); } } diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java b/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeRestServerVerticle.java similarity index 50% copy from transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java copy to edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeRestServerVerticle.java index f969ba5..a7d7529 100644 --- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java +++ b/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeRestServerVerticle.java @@ -14,30 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.servicecomb.edge.core; -package org.apache.servicecomb.transport.rest.client; +import org.apache.servicecomb.transport.rest.client.RestTransportClient; +import org.apache.servicecomb.transport.rest.vertx.RestServerVerticle; -import org.apache.servicecomb.foundation.vertx.VertxUtils; +public class EdgeRestServerVerticle extends RestServerVerticle { + @Override + public void start() throws Exception { + super.start(); -import io.vertx.core.Vertx; - -public final class RestTransportClientManager { - public static final RestTransportClientManager INSTANCE = new RestTransportClientManager(); - - // same instance in AbstractTranport. need refactor in future. - private final Vertx transportVertx = VertxUtils.getOrCreateVertxByName("transport", null); - - private RestTransportClient restClient = new RestTransportClient(); - - private RestTransportClientManager() { - try { - restClient.init(transportVertx); - } catch (Exception e) { - throw new IllegalStateException("Failed to init RestTransportClient.", e); - } - } - - public RestTransportClient getRestClient() { - return restClient; + RestTransportClient restClient = (RestTransportClient) config().getValue(RestTransportClient.class.getName()); + restClient.getClientMgr().findClientPool(false, context); } } diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java b/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeRestTransportClient.java similarity index 50% copy from transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java copy to edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeRestTransportClient.java index f969ba5..bfac386 100644 --- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java +++ b/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeRestTransportClient.java @@ -14,30 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.servicecomb.edge.core; -package org.apache.servicecomb.transport.rest.client; +import org.apache.servicecomb.core.Invocation; +import org.apache.servicecomb.foundation.vertx.client.http.HttpClientWithContext; +import org.apache.servicecomb.transport.rest.client.RestTransportClient; -import org.apache.servicecomb.foundation.vertx.VertxUtils; +import io.vertx.core.Context; -import io.vertx.core.Vertx; - -public final class RestTransportClientManager { - public static final RestTransportClientManager INSTANCE = new RestTransportClientManager(); - - // same instance in AbstractTranport. need refactor in future. - private final Vertx transportVertx = VertxUtils.getOrCreateVertxByName("transport", null); - - private RestTransportClient restClient = new RestTransportClient(); - - private RestTransportClientManager() { - try { - restClient.init(transportVertx); - } catch (Exception e) { - throw new IllegalStateException("Failed to init RestTransportClient.", e); - } - } - - public RestTransportClient getRestClient() { - return restClient; +public class EdgeRestTransportClient extends RestTransportClient { + @Override + protected HttpClientWithContext findHttpClientPool(Invocation invocation) { + Context invocationContext = (Context) invocation.getHandlerContext().get(EdgeInvocation.EDGE_INVOCATION_CONTEXT); + return clientMgr.findClientPool(invocation.isSync(), invocationContext); } } diff --git a/edge/edge-core/src/test/java/org/apache/servicecomb/edge/core/TestEdgeInvocation.java b/edge/edge-core/src/test/java/org/apache/servicecomb/edge/core/TestEdgeInvocation.java index 302656f..2f821a8 100644 --- a/edge/edge-core/src/test/java/org/apache/servicecomb/edge/core/TestEdgeInvocation.java +++ b/edge/edge-core/src/test/java/org/apache/servicecomb/edge/core/TestEdgeInvocation.java @@ -228,5 +228,6 @@ public class TestEdgeInvocation { Invocation invocation = Deencapsulation.getField(edgeInvocation, "invocation"); Assert.assertThat(invocation.getResponseExecutor(), Matchers.instanceOf(ReactiveResponseExecutor.class)); Assert.assertFalse(invocation.isSync()); + Assert.assertSame(context, invocation.getHandlerContext().get(EdgeInvocation.EDGE_INVOCATION_CONTEXT)); } } diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClient.java b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClient.java index 937097a..8c43814 100644 --- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClient.java +++ b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClient.java @@ -37,15 +37,19 @@ import io.vertx.core.DeploymentOptions; import io.vertx.core.Vertx; import io.vertx.core.http.HttpClientOptions; -public final class RestTransportClient { +public class RestTransportClient { private static final Logger LOGGER = LoggerFactory.getLogger(RestTransportClient.class); private static final String SSL_KEY = "rest.consumer"; - private ClientPoolManager<HttpClientWithContext> clientMgr; + protected ClientPoolManager<HttpClientWithContext> clientMgr; private List<HttpClientFilter> httpClientFilters; + public ClientPoolManager<HttpClientWithContext> getClientMgr() { + return clientMgr; + } + public void init(Vertx vertx) throws Exception { httpClientFilters = SPIServiceUtils.getSortedService(HttpClientFilter.class); @@ -69,7 +73,7 @@ public final class RestTransportClient { } public void send(Invocation invocation, AsyncResponse asyncResp) { - HttpClientWithContext httpClientWithContext = clientMgr.findClientPool(invocation.isSync()); + HttpClientWithContext httpClientWithContext = findHttpClientPool(invocation); RestClientInvocation restClientInvocation = new RestClientInvocation(httpClientWithContext, httpClientFilters); try { restClientInvocation.invoke(invocation, asyncResp); @@ -78,4 +82,8 @@ public final class RestTransportClient { LOGGER.error("vertx rest transport send error.", e); } } + + protected HttpClientWithContext findHttpClientPool(Invocation invocation) { + return clientMgr.findClientPool(invocation.isSync()); + } } diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java index f969ba5..3dfd785 100644 --- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java +++ b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java @@ -27,10 +27,11 @@ public final class RestTransportClientManager { // same instance in AbstractTranport. need refactor in future. private final Vertx transportVertx = VertxUtils.getOrCreateVertxByName("transport", null); - private RestTransportClient restClient = new RestTransportClient(); + private RestTransportClient restClient; private RestTransportClientManager() { try { + restClient = TransportClientConfig.getRestTransportClientCls().newInstance(); restClient.init(transportVertx); } catch (Exception e) { throw new IllegalStateException("Failed to init RestTransportClient.", e); diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/TransportClientConfig.java b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/TransportClientConfig.java index 9803267..15d1226 100644 --- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/TransportClientConfig.java +++ b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/TransportClientConfig.java @@ -20,9 +20,19 @@ package org.apache.servicecomb.transport.rest.client; import com.netflix.config.DynamicPropertyFactory; public final class TransportClientConfig { + private static Class<? extends RestTransportClient> restTransportClientCls = RestTransportClient.class; + private TransportClientConfig() { } + public static Class<? extends RestTransportClient> getRestTransportClientCls() { + return restTransportClientCls; + } + + public static void setRestTransportClientCls(Class<? extends RestTransportClient> restTransportClientCls) { + TransportClientConfig.restTransportClientCls = restTransportClientCls; + } + public static int getThreadCount() { return DynamicPropertyFactory.getInstance().getIntProperty("cse.rest.client.thread-count", 1).get(); } @@ -40,8 +50,10 @@ public final class TransportClientConfig { public static boolean getConnectionKeepAlive() { return DynamicPropertyFactory.getInstance().getBooleanProperty("cse.rest.client.connection.keepAlive", true).get(); } - + public static boolean getConnectionCompression() { - return DynamicPropertyFactory.getInstance().getBooleanProperty("cse.rest.client.connection.compression", false).get(); + return DynamicPropertyFactory.getInstance() + .getBooleanProperty("cse.rest.client.connection.compression", false) + .get(); } } diff --git a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/TransportConfig.java b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/TransportConfig.java index fee033c..01f13c8 100644 --- a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/TransportConfig.java +++ b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/TransportConfig.java @@ -21,6 +21,8 @@ import com.netflix.config.DynamicIntProperty; import com.netflix.config.DynamicPropertyFactory; import com.netflix.config.DynamicStringProperty; +import io.vertx.core.Verticle; + public final class TransportConfig { public static final int DEFAULT_SERVER_THREAD_COUNT = 1; @@ -32,9 +34,19 @@ public final class TransportConfig { // 32K public static final int DEFAULT_SERVER_MAX_HEADER_SIZE = 32 * 1024; + private static Class<? extends Verticle> restServerVerticle = RestServerVerticle.class; + private TransportConfig() { } + public static Class<? extends Verticle> getRestServerVerticle() { + return restServerVerticle; + } + + public static void setRestServerVerticle(Class<? extends Verticle> restServerVerticle) { + TransportConfig.restServerVerticle = restServerVerticle; + } + public static String getAddress() { DynamicStringProperty address = DynamicPropertyFactory.getInstance().getStringProperty("cse.rest.address", null); diff --git a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/VertxRestTransport.java b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/VertxRestTransport.java index 66f4638..008792c 100644 --- a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/VertxRestTransport.java +++ b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/VertxRestTransport.java @@ -74,8 +74,9 @@ public class VertxRestTransport extends AbstractTransport { DeploymentOptions options = new DeploymentOptions().setInstances(TransportConfig.getThreadCount()); SimpleJsonObject json = new SimpleJsonObject(); json.put(ENDPOINT_KEY, getEndpoint()); + json.put(RestTransportClient.class.getName(), restClient); options.setConfig(json); - return VertxUtils.blockDeploy(transportVertx, RestServerVerticle.class, options); + return VertxUtils.blockDeploy(transportVertx, TransportConfig.getRestServerVerticle(), options); } @Override -- To stop receiving notification emails like this one, please contact [email protected].
