This is an automated email from the ASF dual-hosted git repository. liubao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git
commit f8085a347f06c32ec9028f4385ab67530b75b2c4 Author: weichao666 <[email protected]> AuthorDate: Wed Oct 17 22:48:05 2018 +0800 [SCB-967] Support configed IP send request --- .../loadbalance/LoadbalanceHandler.java | 29 +++++++++- .../loadbalance/TestLoadBalanceHandler2.java | 65 ++++++++++++++++++++++ 2 files changed, 91 insertions(+), 3 deletions(-) diff --git a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/LoadbalanceHandler.java b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/LoadbalanceHandler.java index 95db713..ba10f8f 100644 --- a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/LoadbalanceHandler.java +++ b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/LoadbalanceHandler.java @@ -26,9 +26,14 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import javax.ws.rs.core.Response.Status; + import org.apache.commons.lang3.StringUtils; +import org.apache.servicecomb.core.CseContext; +import org.apache.servicecomb.core.Endpoint; import org.apache.servicecomb.core.Handler; import org.apache.servicecomb.core.Invocation; +import org.apache.servicecomb.core.Transport; import org.apache.servicecomb.core.exception.ExceptionUtils; import org.apache.servicecomb.core.provider.consumer.SyncResponseExecutor; import org.apache.servicecomb.foundation.common.cache.VersionedCache; @@ -61,6 +66,8 @@ import rx.Observable; public class LoadbalanceHandler implements Handler { public static final String CONTEXT_KEY_SERVER_LIST = "x-context-server-list"; + public static final String SERVICECOMB_SERVER_ENDPOINT = "servicecomb-server-endpoint"; + // just a wrapper to make sure in retry mode to choose a different server. class RetryLoadBalancer implements ILoadBalancer { // Enough times to make sure to choose a different server in high volume. @@ -150,7 +157,8 @@ public class LoadbalanceHandler implements Handler { // Old configurations check.Just print an error, because configurations may given in dynamic and fail on runtime. String policyName = DynamicPropertyFactory.getInstance() - .getStringProperty("servicecomb.loadbalance.NFLoadBalancerRuleClassName", null).get(); + .getStringProperty("servicecomb.loadbalance.NFLoadBalancerRuleClassName", null) + .get(); if (!StringUtils.isEmpty(policyName)) { LOGGER.error("[servicecomb.loadbalance.NFLoadBalancerRuleClassName] is not supported anymore." + "use [servicecomb.loadbalance.strategy.name] instead."); @@ -167,6 +175,17 @@ public class LoadbalanceHandler implements Handler { @Override public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception { + String endpointUri = invocation.getLocalContext(SERVICECOMB_SERVER_ENDPOINT); + if (endpointUri != null) { + boolean isRest = endpointUri.startsWith("rest://"); + if (!isRest) { + throw new InvocationException(Status.BAD_REQUEST, + "the endpoint's format of the configuration is incorrect, e.g rest://127.0.0.1:8080"); + } + Transport transport = CseContext.getInstance().getTransportManager().findTransport("rest"); + Endpoint endpoint = new Endpoint(transport, endpointUri); + invocation.setEndpoint(endpoint); + } String strategy = Configuration.INSTANCE.getRuleStrategyName(invocation.getMicroserviceName()); if (!isEqual(strategy, this.strategy)) { //配置变化,需要重新生成所有的lb实例 @@ -199,7 +218,9 @@ public class LoadbalanceHandler implements Handler { return; } chosenLB.getLoadBalancerStats().incrementNumRequests(server); - invocation.setEndpoint(server.getEndpoint()); + if (invocation.getEndpoint() == null) { + invocation.setEndpoint(server.getEndpoint()); + } invocation.next(resp -> { // this stats is for WeightedResponseTimeRule chosenLB.getLoadBalancerStats().noteResponseTime(server, (System.currentTimeMillis() - time)); @@ -307,7 +328,9 @@ public class LoadbalanceHandler implements Handler { ServiceCombServer server = (ServiceCombServer) s; chosenLB.getLoadBalancerStats().incrementNumRequests(s); invocation.setHandlerIndex(currentHandler); // for retry - invocation.setEndpoint(server.getEndpoint()); + if (invocation.getEndpoint() == null) { + invocation.setEndpoint(server.getEndpoint()); + } invocation.next(resp -> { if (isFailedResponse(resp)) { LOGGER.error("service {}, call error, msg is {}, server is {} ", diff --git a/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestLoadBalanceHandler2.java b/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestLoadBalanceHandler2.java index f962c93..efd011e 100644 --- a/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestLoadBalanceHandler2.java +++ b/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestLoadBalanceHandler2.java @@ -40,6 +40,7 @@ import org.apache.servicecomb.serviceregistry.api.registry.DataCenterInfo; import org.apache.servicecomb.serviceregistry.api.registry.MicroserviceInstance; import org.apache.servicecomb.serviceregistry.cache.InstanceCacheManager; import org.apache.servicecomb.serviceregistry.discovery.DiscoveryTreeNode; +import org.apache.servicecomb.swagger.invocation.AsyncResponse; import org.junit.After; import org.junit.Assert; import org.junit.BeforeClass; @@ -197,4 +198,68 @@ public class TestLoadBalanceHandler2 { server = (ServiceCombServer) loadBalancer.chooseServer(invocation); Assert.assertEquals(server.getEndpoint().getEndpoint(), "rest://localhost:9091"); } + + @Test + public void testConfiguredEndpoint() { + ReferenceConfig referenceConfig = Mockito.mock(ReferenceConfig.class); + OperationMeta operationMeta = Mockito.mock(OperationMeta.class); + SchemaMeta schemaMeta = Mockito.mock(SchemaMeta.class); + when(operationMeta.getSchemaMeta()).thenReturn(schemaMeta); + MicroserviceMeta microserviceMeta = Mockito.mock(MicroserviceMeta.class); + when(schemaMeta.getMicroserviceMeta()).thenReturn(microserviceMeta); + when(schemaMeta.getMicroserviceName()).thenReturn("testMicroserviceName"); + when(microserviceMeta.getAppId()).thenReturn("testApp"); + when(referenceConfig.getVersionRule()).thenReturn("0.0.0+"); + when(referenceConfig.getTransport()).thenReturn("rest"); + Invocation invocation = new Invocation(referenceConfig, operationMeta, new Object[0]); + AsyncResponse asyncResp = Mockito.mock(AsyncResponse.class); + + InstanceCacheManager instanceCacheManager = Mockito.mock(InstanceCacheManager.class); + ServiceRegistry serviceRegistry = Mockito.mock(ServiceRegistry.class); + TransportManager transportManager = Mockito.mock(TransportManager.class); + Transport transport = Mockito.mock(Transport.class); + ArchaiusUtils.setProperty("servicecomb.loadbalance.filter.operation.enabled", "false"); + + // set up data + MicroserviceInstance myself = new MicroserviceInstance(); + + MicroserviceInstance findInstance = new MicroserviceInstance(); + List<String> findEndpoint = new ArrayList<>(); + findEndpoint.add("rest://localhost:9092"); + findInstance.setEndpoints(findEndpoint); + findInstance.setInstanceId("findInstance"); + + Map<String, MicroserviceInstance> data = new HashMap<>(); + DiscoveryTreeNode parent = new DiscoveryTreeNode().name("parent").data(data); + CseContext.getInstance().setTransportManager(transportManager); + + RegistryUtils.setServiceRegistry(serviceRegistry); + + when(serviceRegistry.getMicroserviceInstance()).thenReturn(myself); + when(serviceRegistry.getInstanceCacheManager()).thenReturn(instanceCacheManager); + when(instanceCacheManager.getOrCreateVersionedCache("testApp", "testMicroserviceName", "0.0.0+")) + .thenReturn(parent); + when(transportManager.findTransport("rest")).thenReturn(transport); + + LoadbalanceHandler handler = null; + + handler = new LoadbalanceHandler(); + data.put("findInstance", findInstance); + parent.cacheVersion(1); + handler = new LoadbalanceHandler(); + try { + handler.handle(invocation, asyncResp); + } catch (Exception e) { + + } + Assert.assertEquals("rest://localhost:9092", invocation.getEndpoint().getEndpoint()); + + invocation.addLocalContext("servicecomb-server-endpoint", "rest://127.0.0.1:8080"); + try { + handler.handle(invocation, asyncResp); + } catch (Exception e) { + + } + Assert.assertEquals("rest://127.0.0.1:8080", invocation.getEndpoint().getEndpoint()); + } }
