Repository: ignite Updated Branches: refs/heads/master 0b304042d -> 6679b6cbe
IGNITE-6562: Dynamic service deployment should use projection if NodeFilter is not set. This closes #2810. Signed-off-by: nikolay_tikhonov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6679b6cb Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6679b6cb Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6679b6cb Branch: refs/heads/master Commit: 6679b6cbe6a26f8e9ba2a02bcf56801811e99abd Parents: 0b30404 Author: Andrey V. Mashenkov <[email protected]> Authored: Mon Oct 16 14:35:21 2017 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Mon Oct 16 14:35:21 2017 +0300 ---------------------------------------------------------------------- .../ignite/internal/IgniteServicesImpl.java | 4 +- .../service/GridServiceProcessor.java | 59 +++++++++------- .../GridServiceProcessorMultiNodeSelfTest.java | 71 +++++++++++++++++++- 3 files changed, 105 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6679b6cb/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java index 00d6078..7cbd4b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java @@ -235,7 +235,7 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer guard(); try { - saveOrGet(ctx.service().deployAll(cfgs)); + saveOrGet(ctx.service().deployAll(prj, cfgs)); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -252,7 +252,7 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer guard(); try { - return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().deployAll(cfgs)); + return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().deployAll(prj, cfgs)); } finally { unguard(); http://git-wip-us.apache.org/repos/asf/ignite/blob/6679b6cb/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 6f1dfc7..7097735 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -89,6 +89,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.security.SecurityException; @@ -263,15 +264,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite ServiceConfiguration[] cfgs = ctx.config().getServiceConfiguration(); - if (cfgs != null) { - for (ServiceConfiguration c : cfgs) { - // Deploy only on server nodes by default. - if (c.getNodeFilter() == null) - c.setNodeFilter(ctx.cluster().get().forServers().predicate()); - } - - deployAll(Arrays.asList(cfgs)).get(); - } + if (cfgs != null) + deployAll(Arrays.asList(cfgs), ctx.cluster().get().forServers().predicate()).get(); if (log.isDebugEnabled()) log.debug("Started service processor."); @@ -474,9 +468,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite cfg.setService(svc); cfg.setTotalCount(totalCnt); cfg.setMaxPerNodeCount(maxPerNodeCnt); - cfg.setNodeFilter(F.<ClusterNode>alwaysTrue() == prj.predicate() ? null : prj.predicate()); - return deploy(cfg); + return deployAll(prj, Collections.singleton(cfg)); } /** @@ -499,14 +492,17 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite cfg.setTotalCount(1); cfg.setMaxPerNodeCount(1); - return deploy(cfg); + // Ignore projection here. + return deployAll(Collections.singleton(cfg), null); } /** * @param cfgs Service configurations. + * @param dfltNodeFilter Default NodeFilter. * @return Configurations to deploy. */ - private PreparedConfigurations prepareServiceConfigurations(Collection<ServiceConfiguration> cfgs) { + private PreparedConfigurations prepareServiceConfigurations(Collection<ServiceConfiguration> cfgs, + IgnitePredicate<ClusterNode> dfltNodeFilter) { List<ServiceConfiguration> cfgsCp = new ArrayList<>(cfgs.size()); Marshaller marsh = ctx.config().getMarshaller(); @@ -516,6 +512,11 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite for (ServiceConfiguration cfg : cfgs) { Exception err = null; + // Deploy to projection node by default + // or only on server nodes if no projection . + if (cfg.getNodeFilter() == null && dfltNodeFilter != null) + cfg.setNodeFilter(dfltNodeFilter); + try { validate(cfg); } @@ -568,13 +569,31 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite } /** + * @param prj Grid projection. * @param cfgs Service configurations. * @return Future for deployment. */ - public IgniteInternalFuture<?> deployAll(Collection<ServiceConfiguration> cfgs) { + public IgniteInternalFuture<?> deployAll(ClusterGroup prj, Collection<ServiceConfiguration> cfgs) { + if (prj == null) + // Deploy to servers by default if no projection specified. + return deployAll(cfgs, ctx.cluster().get().forServers().predicate()); + else if (prj.predicate() == F.<ClusterNode>alwaysTrue()) + return deployAll(cfgs, null); + else + // Deploy to predicate nodes by default. + return deployAll(cfgs, prj.predicate()); + } + + /** + * @param cfgs Service configurations. + * @param dfltNodeFilter Default NodeFilter. + * @return Future for deployment. + */ + private IgniteInternalFuture<?> deployAll(Collection<ServiceConfiguration> cfgs, + @Nullable IgnitePredicate<ClusterNode> dfltNodeFilter) { assert cfgs != null; - PreparedConfigurations srvCfg = prepareServiceConfigurations(cfgs); + PreparedConfigurations srvCfg = prepareServiceConfigurations(cfgs, dfltNodeFilter); List<ServiceConfiguration> cfgsCp = srvCfg.cfgs; @@ -733,16 +752,6 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite } /** - * @param cfg Service configuration. - * @return Future for deployment. - */ - public IgniteInternalFuture<?> deploy(ServiceConfiguration cfg) { - A.notNull(cfg, "cfg"); - - return deployAll(Collections.singleton(cfg)); - } - - /** * @param name Service name. * @return Future. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6679b6cb/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java index df7ddf1..517f061 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorMultiNodeSelfTest.java @@ -169,7 +169,7 @@ public class GridServiceProcessorMultiNodeSelfTest extends GridServiceProcessorA // Since we start extra nodes, there may be extra start and cancel events, // so we check only the difference between start and cancel and // not start and cancel events individually. - assertEquals(name, nodeCount() + servers, DummyService.started(name) - DummyService.cancelled(name)); + assertEquals(name, nodeCount() + servers, DummyService.started(name) - DummyService.cancelled(name)); checkCount(name, g.services().serviceDescriptors(), nodeCount() + servers); } @@ -185,6 +185,73 @@ public class GridServiceProcessorMultiNodeSelfTest extends GridServiceProcessorA /** * @throws Exception If failed. */ + public void testDeployOnEachProjectionNodeUpdateTopology() throws Exception { + // Prestart client node. + Ignite client = startGrid("client", getConfiguration("client").setClientMode(true)); + + try { + final String name = "serviceOnEachProjectionNodeUpdateTopology"; + + Ignite g = randomGrid(); + + int prestartedSrvcs = 1; + + CountDownLatch latch = new CountDownLatch(prestartedSrvcs); + + DummyService.exeLatch(name, latch); + + IgniteServices svcs = g.services(g.cluster().forClients()); + + IgniteFuture<?> fut = svcs.deployNodeSingletonAsync(name, new DummyService()); + + info("Deployed service: " + name); + + fut.get(); + + info("Finished waiting for service future: " + name); + + latch.await(); + + // Ensure service is deployed + assertNotNull(client.services().serviceProxy(name, Service.class, false, 2000)); + + assertEquals(name, prestartedSrvcs, DummyService.started(name)); + assertEquals(name, 0, DummyService.cancelled(name)); + + int servers = 2; + + int clients = 2; + + latch = new CountDownLatch(clients); + + DummyService.exeLatch(name, latch); + + startExtraNodes(servers, clients); + + try { + latch.await(); + + waitForDeployment(name, clients); + + // Since we start extra nodes, there may be extra start and cancel events, + // so we check only the difference between start and cancel and + // not start and cancel events individually. + assertEquals(name, clients + prestartedSrvcs, DummyService.started(name) - DummyService.cancelled(name)); + + checkCount(name, g.services().serviceDescriptors(), clients + prestartedSrvcs); + } + finally { + stopExtraNodes(servers + clients); + } + } + finally { + stopGrid("client"); + } + } + + /** + * @throws Exception If failed. + */ public void testDeployOnEachNodeUpdateTopology() throws Exception { // Prestart client node. Ignite client = startGrid("client", getConfiguration("client").setClientMode(true)); @@ -315,7 +382,7 @@ public class GridServiceProcessorMultiNodeSelfTest extends GridServiceProcessorA // Since we start extra nodes, there may be extra start and cancel events, // so we check only the difference between start and cancel and // not start and cancel events individually. - assertEquals(name, totalInstances, DummyService.started(name) - DummyService.cancelled(name)); + assertEquals(name, totalInstances, DummyService.started(name) - DummyService.cancelled(name)); checkCount(name, g.services().serviceDescriptors(), totalInstances); }
