ignite-5145 Support multiple service deployment in API
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0b6da976 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0b6da976 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0b6da976 Branch: refs/heads/ignite-5896 Commit: 0b6da9766ea5d3a096bbbf63de5ebbbe3a883677 Parents: e791254 Author: Denis Mekhanikov <[email protected]> Authored: Tue Sep 5 12:05:50 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue Sep 5 12:05:50 2017 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteServices.java | 112 ++- .../ignite/internal/IgniteServicesImpl.java | 64 +- .../discovery/GridDiscoveryManager.java | 5 - .../GridServiceDeploymentCompoundFuture.java | 196 +++++ .../service/GridServiceProcessor.java | 555 +++++++++----- .../service/PreparedConfigurations.java | 53 ++ .../service/ServiceDeploymentException.java | 78 ++ .../util/future/GridCompoundFuture.java | 15 +- ...ServiceDeploymentCompoundFutureSelfTest.java | 241 ++++++ ...GridServiceProcessorBatchDeploySelfTest.java | 741 +++++++++++++++++++ .../testsuites/IgniteKernalSelfTestSuite.java | 4 + 11 files changed, 1821 insertions(+), 243 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0b6da976/modules/core/src/main/java/org/apache/ignite/IgniteServices.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteServices.java b/modules/core/src/main/java/org/apache/ignite/IgniteServices.java index 1c01598..271adbb 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteServices.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteServices.java @@ -20,6 +20,7 @@ package org.apache.ignite; import java.util.Collection; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.processors.service.ServiceDeploymentException; import org.apache.ignite.lang.IgniteAsyncSupport; import org.apache.ignite.lang.IgniteAsyncSupported; import org.apache.ignite.lang.IgniteFuture; @@ -156,10 +157,10 @@ public interface IgniteServices extends IgniteAsyncSupport { * * @param name Service name. * @param svc Service instance. - * @throws IgniteException If failed to deploy service. + * @throws ServiceDeploymentException If failed to deploy service. */ @IgniteAsyncSupported - public void deployClusterSingleton(String name, Service svc) throws IgniteException; + public void deployClusterSingleton(String name, Service svc) throws ServiceDeploymentException; /** * Asynchronously deploys a cluster-wide singleton service. Ignite will guarantee that there is always @@ -178,9 +179,8 @@ public interface IgniteServices extends IgniteAsyncSupport { * @param name Service name. * @param svc Service instance. * @return a Future representing pending completion of the operation. - * @throws IgniteException If failed to deploy service. */ - public IgniteFuture<Void> deployClusterSingletonAsync(String name, Service svc) throws IgniteException; + public IgniteFuture<Void> deployClusterSingletonAsync(String name, Service svc); /** * Deploys a per-node singleton service. Ignite will guarantee that there is always @@ -194,10 +194,10 @@ public interface IgniteServices extends IgniteAsyncSupport { * * @param name Service name. * @param svc Service instance. - * @throws IgniteException If failed to deploy service. + * @throws ServiceDeploymentException If failed to deploy service. */ @IgniteAsyncSupported - public void deployNodeSingleton(String name, Service svc) throws IgniteException; + public void deployNodeSingleton(String name, Service svc) throws ServiceDeploymentException; /** * Asynchronously deploys a per-node singleton service. Ignite will guarantee that there is always @@ -212,9 +212,8 @@ public interface IgniteServices extends IgniteAsyncSupport { * @param name Service name. * @param svc Service instance. * @return a Future representing pending completion of the operation. - * @throws IgniteException If failed to deploy service. */ - public IgniteFuture<Void> deployNodeSingletonAsync(String name, Service svc) throws IgniteException; + public IgniteFuture<Void> deployNodeSingletonAsync(String name, Service svc); /** * Deploys one instance of this service on the primary node for a given affinity key. @@ -245,11 +244,11 @@ public interface IgniteServices extends IgniteAsyncSupport { * @param cacheName Name of the cache on which affinity for key should be calculated, {@code null} for * default cache. * @param affKey Affinity cache key. - * @throws IgniteException If failed to deploy service. + * @throws ServiceDeploymentException If failed to deploy service. */ @IgniteAsyncSupported public void deployKeyAffinitySingleton(String name, Service svc, @Nullable String cacheName, Object affKey) - throws IgniteException; + throws ServiceDeploymentException; /** * Asynchronously deploys one instance of this service on the primary node for a given affinity key. @@ -281,10 +280,9 @@ public interface IgniteServices extends IgniteAsyncSupport { * default cache. * @param affKey Affinity cache key. * @return a Future representing pending completion of the operation. - * @throws IgniteException If failed to deploy service. */ public IgniteFuture<Void> deployKeyAffinitySingletonAsync(String name, Service svc, @Nullable String cacheName, - Object affKey) throws IgniteException; + Object affKey); /** * Deploys multiple instances of the service on the grid. Ignite will deploy a @@ -314,10 +312,11 @@ public interface IgniteServices extends IgniteAsyncSupport { * @param svc Service instance. * @param totalCnt Maximum number of deployed services in the grid, {@code 0} for unlimited. * @param maxPerNodeCnt Maximum number of deployed services on each node, {@code 0} for unlimited. - * @throws IgniteException If failed to deploy service. + * @throws ServiceDeploymentException If failed to deploy service. */ @IgniteAsyncSupported - public void deployMultiple(String name, Service svc, int totalCnt, int maxPerNodeCnt) throws IgniteException; + public void deployMultiple(String name, Service svc, int totalCnt, int maxPerNodeCnt) + throws ServiceDeploymentException; /** * Asynchronously deploys multiple instances of the service on the grid. Ignite will deploy a @@ -348,10 +347,8 @@ public interface IgniteServices extends IgniteAsyncSupport { * @param totalCnt Maximum number of deployed services in the grid, {@code 0} for unlimited. * @param maxPerNodeCnt Maximum number of deployed services on each node, {@code 0} for unlimited. * @return a Future representing pending completion of the operation. - * @throws IgniteException If failed to deploy service. */ - public IgniteFuture<Void> deployMultipleAsync(String name, Service svc, int totalCnt, int maxPerNodeCnt) - throws IgniteException; + public IgniteFuture<Void> deployMultipleAsync(String name, Service svc, int totalCnt, int maxPerNodeCnt); /** * Deploys multiple instances of the service on the grid according to provided @@ -390,10 +387,10 @@ public interface IgniteServices extends IgniteAsyncSupport { * </pre> * * @param cfg Service configuration. - * @throws IgniteException If failed to deploy service. + * @throws ServiceDeploymentException If failed to deploy service. */ @IgniteAsyncSupported - public void deploy(ServiceConfiguration cfg) throws IgniteException; + public void deploy(ServiceConfiguration cfg) throws ServiceDeploymentException; /** * Asynchronously deploys multiple instances of the service on the grid according to provided @@ -433,9 +430,51 @@ public interface IgniteServices extends IgniteAsyncSupport { * * @param cfg Service configuration. * @return a Future representing pending completion of the operation. - * @throws IgniteException If failed to deploy service. */ - public IgniteFuture<Void> deployAsync(ServiceConfiguration cfg) throws IgniteException; + public IgniteFuture<Void> deployAsync(ServiceConfiguration cfg); + + /** + * Deploys multiple services described by provided configurations. Depending on specified parameters, multiple + * instances of the same service may be deployed (see {@link ServiceConfiguration}). + * Whenever topology changes, Ignite will automatically rebalance + * the deployed services within cluster to make sure that each node will end up with + * about equal number of deployed instances whenever possible. + * + * If deployment fails, then {@link ServiceDeploymentException} containing a list of failed services will be + * thrown. It is guaranteed that all services that were provided to this method and are not present in the list of + * failed services are successfully deployed by the moment of the exception being thrown. + * + * @param cfgs {@link Collection} of service configurations to be deployed. + * @param allOrNone Specifies behavior in case when errors during deployment occur. If {@code true}, then two + * outcomes are possible: either all services will be deployed, or none of them. If {@code false}, then partial + * deployments are permitted. + * @throws ServiceDeploymentException If failed to deploy services. + * @see IgniteServices#deploy(ServiceConfiguration) + * @see IgniteServices#deployAllAsync(Collection, boolean) + */ + public void deployAll(Collection<ServiceConfiguration> cfgs, boolean allOrNone) throws ServiceDeploymentException; + + /** + * Asynchronously deploys multiple services described by provided configurations. Depending on specified parameters, + * multiple instances of the same service may be deployed (see {@link ServiceConfiguration}). + * Whenever topology changes, Ignite will automatically rebalance + * the deployed services within cluster to make sure that each node will end up with + * about equal number of deployed instances whenever possible. + * + * If deployment fails, then {@link ServiceDeploymentException} containing a list of failed services will be + * thrown from {@link IgniteFuture#get get()} method of the returned future. It is guaranteed that all services, + * that were provided to this method and are not present in the list of failed services, are successfully deployed + * by the moment of the exception being thrown. + * + * @param cfgs {@link Collection} of service configurations to be deployed. + * @param allOrNone Specifies behavior in case when errors during deployment occur. If {@code true}, then two + * outcomes are possible: either all services will be deployed, or none of them. If {@code false}, then partial + * deployments are permitted. + * @return a Future representing pending completion of the operation. + * @see IgniteServices#deploy(ServiceConfiguration) + * @see IgniteServices#deployAll(Collection,boolean) + */ + public IgniteFuture<Void> deployAllAsync(Collection<ServiceConfiguration> cfgs, boolean allOrNone); /** * Cancels service deployment. If a service with specified name was deployed on the grid, @@ -468,9 +507,33 @@ public interface IgniteServices extends IgniteAsyncSupport { * * @param name Name of service to cancel. * @return a Future representing pending completion of the operation. - * @throws IgniteException If failed to cancel service. */ - public IgniteFuture<Void> cancelAsync(String name) throws IgniteException; + public IgniteFuture<Void> cancelAsync(String name); + + /** + * Cancels services with specified names. + * <p> + * Note that depending on user logic, it may still take extra time for a service to + * finish execution, even after it was cancelled. + * <p> + * Supports asynchronous execution (see {@link IgniteAsyncSupport}). + * + * @param names Names of services to cancel. + * @throws IgniteException If failed to cancel services. + */ + @IgniteAsyncSupported + public void cancelAll(Collection<String> names) throws IgniteException; + + /** + * Asynchronously cancels services with specified names. + * <p> + * Note that depending on user logic, it may still take extra time for a service to + * finish execution, even after it was cancelled. + * + * @param names Names of services to cancel. + * @return a Future representing pending completion of the operation. + */ + public IgniteFuture<Void> cancelAllAsync(Collection<String> names); /** * Cancels all deployed services. @@ -492,9 +555,8 @@ public interface IgniteServices extends IgniteAsyncSupport { * finish execution, even after it was cancelled. * * @return a Future representing pending completion of the operation. - * @throws IgniteException If failed to cancel services. */ - public IgniteFuture<Void> cancelAllAsync() throws IgniteException; + public IgniteFuture<Void> cancelAllAsync(); /** * Gets metadata about all deployed services in the grid. http://git-wip-us.apache.org/repos/asf/ignite/blob/0b6da976/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java index 607dccc..ad455d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java @@ -23,6 +23,7 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import java.io.ObjectStreamException; import java.util.Collection; +import java.util.Collections; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteServices; @@ -94,7 +95,7 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer } /** {@inheritDoc} */ - @Override public IgniteFuture<Void> deployNodeSingletonAsync(String name, Service svc) throws IgniteException { + @Override public IgniteFuture<Void> deployNodeSingletonAsync(String name, Service svc) { A.notNull(name, "name"); A.notNull(svc, "svc"); @@ -127,7 +128,7 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer } /** {@inheritDoc} */ - @Override public IgniteFuture<Void> deployClusterSingletonAsync(String name, Service svc) throws IgniteException { + @Override public IgniteFuture<Void> deployClusterSingletonAsync(String name, Service svc) { A.notNull(name, "name"); A.notNull(svc, "svc"); @@ -160,8 +161,7 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer } /** {@inheritDoc} */ - @Override public IgniteFuture<Void> deployMultipleAsync(String name, Service svc, int totalCnt, - int maxPerNodeCnt) throws IgniteException { + @Override public IgniteFuture<Void> deployMultipleAsync(String name, Service svc, int totalCnt, int maxPerNodeCnt) { A.notNull(name, "name"); A.notNull(svc, "svc"); @@ -198,7 +198,7 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer /** {@inheritDoc} */ @Override public IgniteFuture<Void> deployKeyAffinitySingletonAsync(String name, Service svc, - @Nullable String cacheName, Object affKey) throws IgniteException { + @Nullable String cacheName, Object affKey) { A.notNull(name, "name"); A.notNull(svc, "svc"); A.notNull(affKey, "affKey"); @@ -218,10 +218,24 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer @Override public void deploy(ServiceConfiguration cfg) { A.notNull(cfg, "cfg"); + deployAll(Collections.singleton(cfg), false); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> deployAsync(ServiceConfiguration cfg) { + A.notNull(cfg, "cfg"); + + return deployAllAsync(Collections.singleton(cfg), false); + } + + /** {@inheritDoc} */ + @Override public void deployAll(Collection<ServiceConfiguration> cfgs, boolean allOrNone) { + A.notNull(cfgs, "cfgs"); + guard(); try { - saveOrGet(ctx.service().deploy(cfg)); + saveOrGet(ctx.service().deployAll(cfgs, allOrNone)); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -232,13 +246,14 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer } /** {@inheritDoc} */ - @Override public IgniteFuture<Void> deployAsync(ServiceConfiguration cfg) throws IgniteException { - A.notNull(cfg, "cfg"); + @Override public IgniteFuture<Void> deployAllAsync(Collection<ServiceConfiguration> cfgs, + boolean allOrNone) { + A.notNull(cfgs, "cfgs"); guard(); try { - return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().deploy(cfg)); + return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().deployAll(cfgs, allOrNone)); } finally { unguard(); @@ -263,7 +278,7 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer } /** {@inheritDoc} */ - @Override public IgniteFuture<Void> cancelAsync(String name) throws IgniteException { + @Override public IgniteFuture<Void> cancelAsync(String name) { A.notNull(name, "name"); guard(); @@ -277,6 +292,33 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer } /** {@inheritDoc} */ + @Override public void cancelAll(Collection<String> names) { + guard(); + + try { + saveOrGet(ctx.service().cancelAll(names)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> cancelAllAsync(Collection<String> names) { + guard(); + + try { + return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().cancelAll(names)); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ @Override public void cancelAll() { guard(); @@ -292,7 +334,7 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer } /** {@inheritDoc} */ - @Override public IgniteFuture<Void> cancelAllAsync() throws IgniteException { + @Override public IgniteFuture<Void> cancelAllAsync() { guard(); try { http://git-wip-us.apache.org/repos/asf/ignite/blob/0b6da976/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 74f7fa6..56af9bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -30,7 +30,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -43,9 +42,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.zip.CRC32; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteException; @@ -897,8 +894,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { checkAttributes(discoCache().remoteNodes()); - ctx.service().initCompatibilityMode(discoCache().remoteNodes()); - // Start discovery worker. new IgniteThread(discoWrk).start(); http://git-wip-us.apache.org/repos/asf/ignite/blob/0b6da976/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentCompoundFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentCompoundFuture.java new file mode 100644 index 0000000..12b88e5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentCompoundFuture.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.service; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.future.GridCompoundFuture; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.services.ServiceConfiguration; +import org.jetbrains.annotations.Nullable; + +/** + * Service deployment compound future, {@code allOrNone} parameter specifies failing policy. + * <p> + * If {@code allOrNone} parameter is set to {@code false}, then this future waits for completion of all child futures. + * If any exceptions are thrown during deployment, then {@link IgniteCheckedException} with {@link + * ServiceDeploymentException} as a cause will be thrown from {@link IgniteInternalFuture#get get()} method after all + * futures complete or fail. Inner exception will contain configurations of failed services. + */ +public class GridServiceDeploymentCompoundFuture extends GridCompoundFuture<Object, Object> { + /** */ + private final boolean allOrNone; + + /** Kernal context. */ + private final GridKernalContext ctx; + + /** Logger. */ + private final IgniteLogger log; + + /** Names of services written to cache during current deployment. */ + private Collection<String> svcsToRollback; + + /** */ + private volatile ServiceDeploymentException err; + + /** + * @param allOrNone Failing policy. + * @param ctx Kernal context. + */ + GridServiceDeploymentCompoundFuture(boolean allOrNone, GridKernalContext ctx) { + this.allOrNone = allOrNone; + this.ctx = ctx; + this.log = ctx.log(getClass()); + } + + /** {@inheritDoc} */ + @Override protected boolean processFailure(Throwable err, IgniteInternalFuture<Object> fut) { + assert fut instanceof GridServiceDeploymentFuture : fut; + + GridServiceDeploymentFuture depFut = (GridServiceDeploymentFuture)fut; + + if (allOrNone) { + if (initialized()) { + onDone(new IgniteCheckedException( + new ServiceDeploymentException("Failed to deploy provided services.", err, getConfigurations()))); + } + else { + synchronized (this) { + if (this.err == null) { + this.err = new ServiceDeploymentException("Failed to deploy provided services.", err, + new ArrayList<ServiceConfiguration>()); + } + else + this.err.addSuppressed(err); + } + } + } + else { + synchronized (this) { + if (this.err == null) + this.err = new ServiceDeploymentException("Failed to deploy some services.", + new ArrayList<ServiceConfiguration>()); + + this.err.getFailedConfigurations().add(depFut.configuration()); + this.err.addSuppressed(err); + } + } + + return true; + } + + /** + * Marks this future as initialized. Will complete with error if failures before initialization occurred and + * all-or-none policy is followed. + */ + public void serviceDeploymentMarkInitialized() { + if (allOrNone && this.err != null) { + this.err.getFailedConfigurations().addAll(getConfigurations()); + + onDone(new IgniteCheckedException(this.err)); + } + else + super.markInitialized(); + } + + /** {@inheritDoc} */ + @Override protected boolean onDone(@Nullable final Object res, @Nullable Throwable err, final boolean cancel) { + final Throwable resErr; + + if (err == null && this.err != null) + resErr = new IgniteCheckedException(this.err); + else + resErr = err; + + if (allOrNone && this.err != null && svcsToRollback != null) { + U.warn(log, "Failed to deploy provided services. The following services will be cancelled:" + svcsToRollback); + + IgniteInternalFuture<?> fut = ctx.service().cancelAll(svcsToRollback); + + /* + Can not call fut.get() since it is possible we are in system pool now and + fut also should be completed from system pool. + */ + fut.listen(new IgniteInClosure<IgniteInternalFuture>() { + @Override public void apply(IgniteInternalFuture fut) { + try { + fut.get(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to cancel deployed services.", e); + } + finally { + svcsToRollback = null; + } + + GridServiceDeploymentCompoundFuture.super.onDone(res, resErr, cancel); + } + }); + + return false; + } + + return super.onDone(res, resErr, cancel); + } + + /** + * @param fut Child future. + * @param own If {@code true}, then corresponding service will be cancelled on failure. + */ + public void add(GridServiceDeploymentFuture fut, boolean own) { + super.add(fut); + + if (own) { + if (svcsToRollback == null) + svcsToRollback = new ArrayList<>(); + + svcsToRollback.add(fut.configuration().getName()); + } + } + + /** + * @return Collection of names of services that were written to cache during current deployment. + */ + public Collection<String> servicesToRollback() { + if (svcsToRollback != null) + return svcsToRollback; + else + return Collections.emptyList(); + } + + /** + * @return Collection of configurations, stored in child futures. + */ + private Collection<ServiceConfiguration> getConfigurations() { + Collection<IgniteInternalFuture<Object>> futs = futures(); + + List<ServiceConfiguration> cfgs = new ArrayList<>(futs.size()); + + for (IgniteInternalFuture<Object> fut : futs) + cfgs.add(((GridServiceDeploymentFuture)fut).configuration()); + + return cfgs; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0b6da976/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 1d8720c..3adad23 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -18,8 +18,10 @@ package org.apache.ignite.internal.processors.service; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -32,7 +34,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import javax.cache.Cache; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryUpdatedListener; @@ -90,6 +91,7 @@ import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.security.SecurityException; import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.resources.JobContextResource; @@ -98,6 +100,7 @@ import org.apache.ignite.services.Service; import org.apache.ignite.services.ServiceConfiguration; import org.apache.ignite.services.ServiceDescriptor; import org.apache.ignite.thread.IgniteThreadFactory; +import org.apache.ignite.transactions.Transaction; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@ -108,6 +111,7 @@ import static org.apache.ignite.configuration.DeploymentMode.PRIVATE; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SERVICES_COMPATIBILITY_MODE; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.UTILITY_CACHE_NAME; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; /** @@ -129,9 +133,6 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT }; - /** */ - private final AtomicReference<ServicesCompatibilityState> compatibilityState; - /** Local service instances. */ private final Map<String, Collection<ServiceContextImpl>> locSvcs = new HashMap<>(); @@ -173,9 +174,6 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite String servicesCompatibilityMode = getString(IGNITE_SERVICES_COMPATIBILITY_MODE); srvcCompatibilitySysProp = servicesCompatibilityMode == null ? null : Boolean.valueOf(servicesCompatibilityMode); - - compatibilityState = new AtomicReference<>( - new ServicesCompatibilityState(srvcCompatibilitySysProp != null ? srvcCompatibilitySysProp : false, false)); } /** @@ -265,19 +263,13 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite ServiceConfiguration[] cfgs = ctx.config().getServiceConfiguration(); if (cfgs != null) { - Collection<IgniteInternalFuture<?>> futs = new ArrayList<>(); - for (ServiceConfiguration c : cfgs) { // Deploy only on server nodes by default. if (c.getNodeFilter() == null) c.setNodeFilter(ctx.cluster().get().forServers().predicate()); - - futs.add(deploy(c)); } - // Await for services to deploy. - for (IgniteInternalFuture<?> f : futs) - f.get(); + deployAll(Arrays.asList(cfgs), true).get(); } if (log.isDebugEnabled()) @@ -501,183 +493,288 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite } /** - * @param cfg Service configuration. - * @return Future for deployment. + * @param cfgs Service configurations. + * @param allOrNone Failure processing policy. + * @return Configurations to deploy. */ - public IgniteInternalFuture<?> deploy(ServiceConfiguration cfg) { - A.notNull(cfg, "cfg"); - - ServicesCompatibilityState state = markCompatibilityStateAsUsed(); + private PreparedConfigurations prepareServiceConfigurations(Collection<ServiceConfiguration> cfgs, boolean allOrNone) { + List<ServiceConfiguration> cfgsCp = new ArrayList<>(cfgs.size()); - validate(cfg); + Marshaller marsh = ctx.config().getMarshaller(); - ctx.security().authorize(cfg.getName(), SecurityPermission.SERVICE_DEPLOY, null); + List<GridServiceDeploymentFuture> failedFuts = null; - if (!state.srvcCompatibility) { - Marshaller marsh = ctx.config().getMarshaller(); - - LazyServiceConfiguration cfg0; + for (ServiceConfiguration cfg : cfgs) { + Exception err = null; try { - byte[] srvcBytes = U.marshal(marsh, cfg.getService()); - - cfg0 = new LazyServiceConfiguration(cfg, srvcBytes); + validate(cfg); } - catch (IgniteCheckedException e) { - U.error(log, "Failed to marshal service with configured marshaller [srvc=" + cfg.getService() - + ", marsh=" + marsh + "]", e); + catch (Exception e) { + U.error(log, "Failed to validate service configuration [name=" + cfg.getName() + + ", srvc=" + cfg.getService() + ']', e); - return new GridFinishedFuture<>(e); + err = e; } - cfg = cfg0; - } + if (err == null) { + try { + ctx.security().authorize(cfg.getName(), SecurityPermission.SERVICE_DEPLOY, null); + } + catch (Exception e) { + U.error(log, "Failed to authorize service creation [name=" + cfg.getName() + + ", srvc=" + cfg.getService() + ']', e); - GridServiceDeploymentFuture fut = new GridServiceDeploymentFuture(cfg); + err = e; + } + } - GridServiceDeploymentFuture old = depFuts.putIfAbsent(cfg.getName(), fut); + if (err == null) { + try { + byte[] srvcBytes = U.marshal(marsh, cfg.getService()); - if (old != null) { - if (!old.configuration().equalsIgnoreNodeFilter(cfg)) { - fut.onDone(new IgniteCheckedException("Failed to deploy service (service already exists with " + - "different configuration) [deployed=" + old.configuration() + ", new=" + cfg + ']')); + cfgsCp.add(new LazyServiceConfiguration(cfg, srvcBytes)); + } + catch (Exception e) { + U.error(log, "Failed to marshal service with configured marshaller [name=" + cfg.getName() + + ", srvc=" + cfg.getService() + ", marsh=" + marsh + "]", e); - return fut; + err = e; + } } - return old; - } + if (err != null) { + if (allOrNone) { + return new PreparedConfigurations(null, + null, + new IgniteCheckedException( + new ServiceDeploymentException("None of the provided services were deplyed.", err, cfgs))); + } + else { + if (failedFuts == null) + failedFuts = new ArrayList<>(); - if (ctx.clientDisconnected()) { - fut.onDone(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), - "Failed to deploy service, client node disconnected.")); + GridServiceDeploymentFuture fut = new GridServiceDeploymentFuture(cfg); + + fut.onDone(err); - depFuts.remove(cfg.getName(), fut); + failedFuts.add(fut); + } + } } - while (true) { - try { - GridServiceDeploymentKey key = new GridServiceDeploymentKey(cfg.getName()); + return new PreparedConfigurations(cfgsCp, failedFuts, null); + } - if (ctx.deploy().enabled()) - ctx.cache().context().deploy().ignoreOwnership(true); + /** + * @param cfgs Service configurations. + * @param allOrNone Failure processing policy. + * @return Future for deployment. + */ + public IgniteInternalFuture<?> deployAll(Collection<ServiceConfiguration> cfgs, boolean allOrNone) { + assert cfgs != null; - try { - GridServiceDeployment dep = (GridServiceDeployment)cache.getAndPutIfAbsent(key, - new GridServiceDeployment(ctx.localNodeId(), cfg)); + PreparedConfigurations srvCfg = prepareServiceConfigurations(cfgs, allOrNone); - if (dep != null) { - if (!dep.configuration().equalsIgnoreNodeFilter(cfg)) { - // Remove future from local map. - depFuts.remove(cfg.getName(), fut); + if (srvCfg.err != null) + return new GridFinishedFuture<>(srvCfg.err); - fut.onDone(new IgniteCheckedException("Failed to deploy service (service already exists with " + - "different configuration) [deployed=" + dep.configuration() + ", new=" + cfg + ']')); - } - else { - Iterator<Cache.Entry<Object, Object>> it = serviceEntries( - ServiceAssignmentsPredicate.INSTANCE); + List<ServiceConfiguration> cfgsCp = srvCfg.cfgs; - while (it.hasNext()) { - Cache.Entry<Object, Object> e = it.next(); + List<GridServiceDeploymentFuture> failedFuts = srvCfg.failedFuts; - GridServiceAssignments assigns = (GridServiceAssignments)e.getValue(); + Collections.sort(cfgsCp, new Comparator<ServiceConfiguration>() { + @Override public int compare(ServiceConfiguration cfg1, ServiceConfiguration cfg2) { + return cfg1.getName().compareTo(cfg2.getName()); + } + }); - if (assigns.name().equals(cfg.getName())) { - // Remove future from local map. - depFuts.remove(cfg.getName(), fut); + GridServiceDeploymentCompoundFuture res; - fut.onDone(); + while (true) { + res = new GridServiceDeploymentCompoundFuture(allOrNone, ctx); - break; - } + if (ctx.deploy().enabled()) + ctx.cache().context().deploy().ignoreOwnership(true); + + try { + if (cfgsCp.size() == 1) + writeServiceToCache(res, cfgsCp.get(0)); + else if (cfgsCp.size() > 1) { + try (Transaction tx = cache.txStart(PESSIMISTIC, READ_COMMITTED)) { + for (ServiceConfiguration cfg : cfgsCp) { + try { + writeServiceToCache(res, cfg); } + catch (IgniteCheckedException e) { + if (X.hasCause(e, ClusterTopologyCheckedException.class)) + throw e; // Retry. + + if (allOrNone) { + for (String name : res.servicesToRollback()) + depFuts.remove(name).onDone(e); + + res.onDone(new IgniteCheckedException(new ServiceDeploymentException( + "Failed to deploy provided services.", e, cfgs))); - if (!dep.configuration().equalsIgnoreNodeFilter(cfg)) - U.warn(log, "Service already deployed with different configuration (will ignore) " + - "[deployed=" + dep.configuration() + ", new=" + cfg + ']'); + return res; + } + } } + + tx.commit(); } } - finally { - if (ctx.deploy().enabled()) - ctx.cache().context().deploy().ignoreOwnership(false); - } - return fut; - } - catch (ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Topology changed while deploying service (will retry): " + e.getMessage()); + break; } - catch (IgniteCheckedException e) { - if (e.hasCause(ClusterTopologyCheckedException.class)) { + catch (IgniteException | IgniteCheckedException e) { + for (String name : res.servicesToRollback()) + depFuts.remove(name).onDone(e); + + if (X.hasCause(e, ClusterTopologyCheckedException.class)) { if (log.isDebugEnabled()) - log.debug("Topology changed while deploying service (will retry): " + e.getMessage()); + log.debug("Topology changed while deploying services (will retry): " + e.getMessage()); + } + else { + res.onDone(new IgniteCheckedException( + new ServiceDeploymentException("Failed to deploy provided services.", e, cfgs))); - continue; + return res; } + } + finally { + if (ctx.deploy().enabled()) + ctx.cache().context().deploy().ignoreOwnership(false); + } + } - U.error(log, "Failed to deploy service: " + cfg.getName(), e); + if (ctx.clientDisconnected()) { + IgniteClientDisconnectedCheckedException err = + new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), + "Failed to deploy services, client node disconnected: " + cfgs); + + for (String name : res.servicesToRollback()) { + GridServiceDeploymentFuture fut = depFuts.remove(name); - return new GridFinishedFuture<>(e); + if (fut != null) + fut.onDone(err); } + + return new GridFinishedFuture<>(err); } + + if (failedFuts != null) { + for (GridServiceDeploymentFuture fut : failedFuts) + res.add(fut, false); + } + + res.serviceDeploymentMarkInitialized(); + + return res; } /** - * @return Compatibility state. + * @param res Resulting compound future. + * @param cfg Service configuration. + * @throws IgniteCheckedException If operation failed. */ - private ServicesCompatibilityState markCompatibilityStateAsUsed() { - while (true) { - ServicesCompatibilityState state = compatibilityState.get(); + private void writeServiceToCache(GridServiceDeploymentCompoundFuture res, ServiceConfiguration cfg) + throws IgniteCheckedException { + String name = cfg.getName(); - if (state.used) - return state; + GridServiceDeploymentFuture fut = new GridServiceDeploymentFuture(cfg); - ServicesCompatibilityState newState = new ServicesCompatibilityState(state.srvcCompatibility, true); + GridServiceDeploymentFuture old = depFuts.putIfAbsent(name, fut); - if (compatibilityState.compareAndSet(state, newState)) - return newState; - } - } + try { + if (old != null) { + if (!old.configuration().equalsIgnoreNodeFilter(cfg)) + throw new IgniteCheckedException("Failed to deploy service (service already exists with different " + + "configuration) [deployed=" + old.configuration() + ", new=" + cfg + ']'); + else { + res.add(old, false); - /** - * @param name Service name. - * @return Future. - */ - public IgniteInternalFuture<?> cancel(String name) { - ctx.security().authorize(name, SecurityPermission.SERVICE_CANCEL, null); + return; + } + } - while (true) { - try { - GridFutureAdapter<?> fut = new GridFutureAdapter<>(); + GridServiceDeploymentKey key = new GridServiceDeploymentKey(name); + + GridServiceDeployment dep = (GridServiceDeployment)cache.getAndPutIfAbsent(key, + new GridServiceDeployment(ctx.localNodeId(), cfg)); - GridFutureAdapter<?> old; + if (dep != null) { + if (!dep.configuration().equalsIgnoreNodeFilter(cfg)) { + String err = "Failed to deploy service (service already exists with different " + + "configuration) [deployed=" + dep.configuration() + ", new=" + cfg + ']'; - if ((old = undepFuts.putIfAbsent(name, fut)) != null) - fut = old; + U.error(log, err); + + throw new IgniteCheckedException(err); + } else { - GridServiceDeploymentKey key = new GridServiceDeploymentKey(name); + res.add(fut, false); + + Iterator<Cache.Entry<Object, Object>> it = serviceEntries(ServiceAssignmentsPredicate.INSTANCE); + + while (it.hasNext()) { + Cache.Entry<Object, Object> e = it.next(); - if (cache.getAndRemove(key) == null) { - // Remove future from local map if service was not deployed. - undepFuts.remove(name); + GridServiceAssignments assigns = (GridServiceAssignments)e.getValue(); - fut.onDone(); + if (assigns.name().equals(name)) { + fut.onDone(); + + depFuts.remove(name, fut); + + break; + } } } - - return fut; } - catch (ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Topology changed while deploying service (will retry): " + e.getMessage()); + else + res.add(fut, true); + } + catch (IgniteCheckedException e) { + fut.onDone(e); + + res.add(fut, false); + + depFuts.remove(name, fut); + + throw e; + } + } + + /** + * @param cfg Service configuration. + * @return Future for deployment. + */ + public IgniteInternalFuture<?> deploy(ServiceConfiguration cfg) { + A.notNull(cfg, "cfg"); + + return deployAll(Collections.singleton(cfg), false); + } + + /** + * @param name Service name. + * @return Future. + */ + public IgniteInternalFuture<?> cancel(String name) { + while (true) { + try { + return removeServiceFromCache(name).fut; } - catch (IgniteCheckedException e) { - log.error("Failed to undeploy service: " + name, e); + catch (IgniteException | IgniteCheckedException e) { + if (X.hasCause(e, ClusterTopologyCheckedException.class)) { + if (log.isDebugEnabled()) + log.debug("Topology changed while cancelling service (will retry): " + e.getMessage()); + } else { + U.error(log, "Failed to undeploy service: " + name, e); - return new GridFinishedFuture<>(e); + return new GridFinishedFuture<>(e); + } } } } @@ -689,18 +786,73 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite public IgniteInternalFuture<?> cancelAll() { Iterator<Cache.Entry<Object, Object>> it = serviceEntries(ServiceDeploymentPredicate.INSTANCE); - GridCompoundFuture res = null; + List<String> svcNames = new ArrayList<>(); while (it.hasNext()) { - Cache.Entry<Object, Object> e = it.next(); + GridServiceDeployment dep = (GridServiceDeployment)it.next().getValue(); - GridServiceDeployment dep = (GridServiceDeployment)e.getValue(); + svcNames.add(dep.configuration().getName()); + } + + return cancelAll(svcNames); + } + + /** + * @param svcNames Name of service to deploy. + * @return Future. + */ + @SuppressWarnings("unchecked") + public IgniteInternalFuture<?> cancelAll(Collection<String> svcNames) { + List<String> svcNamesCp = new ArrayList<>(svcNames); - if (res == null) - res = new GridCompoundFuture<>(); + Collections.sort(svcNamesCp); + + GridCompoundFuture res; + + while (true) { + res = null; + + List<String> toRollback = new ArrayList<>(); + + try (Transaction tx = cache.txStart(PESSIMISTIC, READ_COMMITTED)) { + for (String name : svcNames) { + if (res == null) + res = new GridCompoundFuture<>(); + + try { + CancelResult cr = removeServiceFromCache(name); - // Cancel each service separately. - res.add(cancel(dep.configuration().getName())); + if (cr.rollback) + toRollback.add(name); + + res.add(cr.fut); + } + catch (IgniteException | IgniteCheckedException e) { + if (X.hasCause(e, ClusterTopologyCheckedException.class)) + throw e; // Retry. + else { + U.error(log, "Failed to undeploy service: " + name, e); + + res.add(new GridFinishedFuture<>(e)); + } + } + } + + tx.commit(); + + break; + } + catch (IgniteException | IgniteCheckedException e) { + for (String name : toRollback) + undepFuts.remove(name).onDone(e); + + if (X.hasCause(e, ClusterTopologyCheckedException.class)) { + if (log.isDebugEnabled()) + log.debug("Topology changed while cancelling service (will retry): " + e.getMessage()); + } + else + return new GridFinishedFuture<>(e); + } } if (res != null) { @@ -713,6 +865,50 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite } /** + * @param name Name of service to remove from internal cache. + * @return Cancellation future and a flag whether it should be completed and removed on error. + * @throws IgniteCheckedException If operation failed. + */ + private CancelResult removeServiceFromCache(String name) throws IgniteCheckedException { + try { + ctx.security().authorize(name, SecurityPermission.SERVICE_CANCEL, null); + } + catch (SecurityException e) { + return new CancelResult(new GridFinishedFuture<>(e), false); + } + + GridFutureAdapter<?> fut = new GridFutureAdapter<>(); + + GridFutureAdapter<?> old = undepFuts.putIfAbsent(name, fut); + + if (old != null) + return new CancelResult(old, false); + else { + GridServiceDeploymentKey key = new GridServiceDeploymentKey(name); + + try { + if (cache.getAndRemove(key) == null) { + // Remove future from local map if service was not deployed. + undepFuts.remove(name, fut); + + fut.onDone(); + + return new CancelResult(fut, false); + } + else + return new CancelResult(fut, true); + } + catch (IgniteCheckedException e) { + undepFuts.remove(name, fut); + + fut.onDone(e); + + throw e; + } + } + } + + /** * @param name Service name. * @param timeout If greater than 0 limits task execution time. Cannot be negative. * @return Service topology. @@ -1323,23 +1519,6 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite } /** - * @param nodes Remote nodes. - */ - public void initCompatibilityMode(Collection<ClusterNode> nodes) { - boolean mode = false; - - if (srvcCompatibilitySysProp != null) - mode = srvcCompatibilitySysProp; - - while (true) { - ServicesCompatibilityState state = compatibilityState.get(); - - if (compatibilityState.compareAndSet(state, new ServicesCompatibilityState(mode, state.used))) - return; - } - } - - /** * Called right after utility cache is started and ready for the usage. */ public void onUtilityCacheStarted() { @@ -1365,7 +1544,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite @Override public void onUpdated(final Iterable<CacheEntryEvent<?, ?>> deps) { GridSpinBusyLock busyLock = GridServiceProcessor.this.busyLock; - if (busyLock == null || !busyLock.enterBusy()) + if (busyLock == null || !busyLock.enterBusy()) return; try { @@ -1385,27 +1564,11 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite * @param evts Update events. */ private void onSystemCacheUpdated(final Iterable<CacheEntryEvent<?, ?>> evts) { - boolean firstTime = true; - for (CacheEntryEvent<?, ?> e : evts) { - if (e.getKey() instanceof GridServiceDeploymentKey) { - if (firstTime) { - markCompatibilityStateAsUsed(); - - firstTime = false; - } - + if (e.getKey() instanceof GridServiceDeploymentKey) processDeployment((CacheEntryEvent)e); - } - else if (e.getKey() instanceof GridServiceAssignmentsKey) { - if (firstTime) { - markCompatibilityStateAsUsed(); - - firstTime = false; - } - + else if (e.getKey() instanceof GridServiceAssignmentsKey) processAssignment((CacheEntryEvent)e); - } } } @@ -1578,8 +1741,6 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite Iterator<Cache.Entry<Object, Object>> it = serviceEntries( ServiceDeploymentPredicate.INSTANCE); - boolean firstTime = true; - while (it.hasNext()) { // If topology changed again, let next event handle it. AffinityTopologyVersion currTopVer0 = currTopVer; @@ -1596,12 +1757,6 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite Cache.Entry<Object, Object> e = it.next(); - if (firstTime) { - markCompatibilityStateAsUsed(); - - firstTime = false; - } - GridServiceDeployment dep = (GridServiceDeployment)e.getValue(); try { @@ -1789,6 +1944,26 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite /** * */ + private static class CancelResult { + /** */ + IgniteInternalFuture<?> fut; + + /** */ + boolean rollback; + + /** + * @param fut Future. + * @param rollback {@code True} if service was cancelled during current call. + */ + CancelResult(IgniteInternalFuture<?> fut, boolean rollback) { + this.fut = fut; + this.rollback = rollback; + } + } + + /** + * + */ private abstract class DepRunnable implements Runnable { /** {@inheritDoc} */ @Override public void run() { @@ -1935,24 +2110,4 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite return serviceTopology(cache, svcName); } } - - /** - * - */ - private static class ServicesCompatibilityState { - /** */ - private final boolean srvcCompatibility; - - /** */ - private final boolean used; - - /** - * @param srvcCompatibility Services compatibility mode ({@code true} if compatible with old nodes). - * @param used Services has been used. - */ - ServicesCompatibilityState(boolean srvcCompatibility, boolean used) { - this.srvcCompatibility = srvcCompatibility; - this.used = used; - } - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0b6da976/modules/core/src/main/java/org/apache/ignite/internal/processors/service/PreparedConfigurations.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/PreparedConfigurations.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/PreparedConfigurations.java new file mode 100644 index 0000000..a581e15 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/PreparedConfigurations.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.service; + +import java.util.List; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.services.ServiceConfiguration; + +/** + * Result of services validation before deployment. + */ +class PreparedConfigurations { + /** */ + final List<ServiceConfiguration> cfgs; + + /** */ + final List<GridServiceDeploymentFuture> failedFuts; + + /** */ + final Exception err; + + /** + * @param cfgs Configurations to deploy. + * @param failedFuts Finished futures for failed configurations. + * @param err Error if need to stop deploy. + */ + PreparedConfigurations(List<ServiceConfiguration> cfgs, List<GridServiceDeploymentFuture> failedFuts, + Exception err) { + this.cfgs = cfgs; + this.failedFuts = failedFuts; + this.err = err; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(PreparedConfigurations.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0b6da976/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentException.java new file mode 100644 index 0000000..32fbf6f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentException.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.service; + +import java.util.Collection; +import org.apache.ignite.IgniteException; +import org.apache.ignite.services.ServiceConfiguration; +import org.jetbrains.annotations.Nullable; + +/** + * Exception indicating service deployment failure. + */ +public class ServiceDeploymentException extends IgniteException { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final Collection<ServiceConfiguration> cfgs; + + /** + * Creates service deployment exception with error message. + * + * @param msg Error message. + * @param cfgs Configurations of services that failed to deploy. + */ + public ServiceDeploymentException(String msg, Collection<ServiceConfiguration> cfgs) { + super(msg); + + this.cfgs = cfgs; + } + + /** + * Creates service deployment exception with {@link Throwable} as a cause. + * + * @param cause Cause. + * @param cfgs Configurations of services that failed to deploy. + */ + public ServiceDeploymentException(Throwable cause, Collection<ServiceConfiguration> cfgs) { + super(cause); + + this.cfgs = cfgs; + } + + /** + * Creates service deployment exception with error message and {@link Throwable} as a cause. + * + * @param msg Error message. + * @param cause Cause. + * @param cfgs Configurations of services that failed to deploy. + */ + public ServiceDeploymentException(String msg, @Nullable Throwable cause, Collection<ServiceConfiguration> cfgs) { + super(msg, cause); + + this.cfgs = cfgs; + } + + /** + * @return Configurations of services that failed to deploy. + */ + public Collection<ServiceConfiguration> getFailedConfigurations() { + return cfgs; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/0b6da976/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java index 80cf67b..a724060 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java @@ -112,11 +112,11 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig } catch (IgniteTxOptimisticCheckedException | IgniteFutureCancelledCheckedException | ClusterTopologyCheckedException e) { - if (!ignoreFailure(e)) + if (!processFailure(e, fut)) onDone(e); } catch (IgniteCheckedException e) { - if (!ignoreFailure(e)) { + if (!processFailure(e, fut)) { if (e instanceof NodeStoppingException) logDebug(logger(), "Failed to execute compound future reducer, node stopped."); else @@ -183,6 +183,17 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig } /** + * Processes error thrown by some of the inner futures. + * + * @param err Thrown exception. + * @param fut Failed future. + * @return {@code True} if this error should be ignored. + */ + protected boolean processFailure(Throwable err, IgniteInternalFuture<T> fut) { + return ignoreFailure(err); + } + + /** * Checks if there are pending futures. This is not the same as * {@link #isDone()} because child classes may override {@link #onDone(Object, Throwable)} * call and delay completion. http://git-wip-us.apache.org/repos/asf/ignite/blob/0b6da976/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentCompoundFutureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentCompoundFutureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentCompoundFutureSelfTest.java new file mode 100644 index 0000000..51c3407 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentCompoundFutureSelfTest.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.service; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.services.ServiceConfiguration; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** */ +public class GridServiceDeploymentCompoundFutureSelfTest extends GridCommonAbstractTest { + /** */ + private static GridKernalContext ctx; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + IgniteKernal kernal = (IgniteKernal)startGrid(0); + + ctx = kernal.context(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testWaitForCompletionOnFailingFuturePartial() throws Exception { + GridServiceDeploymentCompoundFuture compFut = new GridServiceDeploymentCompoundFuture(false, ctx); + + int failingFutsNum = 2; + + int completingFutsNum = 5; + + Collection<GridServiceDeploymentFuture> failingFuts = new ArrayList<>(completingFutsNum); + + for (int i = 0; i < failingFutsNum; i++) { + ServiceConfiguration failingCfg = config("Failed-" + i); + + GridServiceDeploymentFuture failingFut = new GridServiceDeploymentFuture(failingCfg); + + failingFuts.add(failingFut); + + compFut.add(failingFut); + } + + List<GridFutureAdapter<Object>> futs = new ArrayList<>(completingFutsNum); + + for (int i = 0; i < completingFutsNum; i++) { + GridServiceDeploymentFuture fut = new GridServiceDeploymentFuture(config(String.valueOf(i))); + + futs.add(fut); + + compFut.add(fut); + } + + compFut.serviceDeploymentMarkInitialized(); + + List<Exception> causes = new ArrayList<>(); + + for (GridServiceDeploymentFuture fut : failingFuts) { + Exception cause = new Exception("Test error"); + + causes.add(cause); + + fut.onDone(cause); + } + + try { + compFut.get(100); + + fail("Should never reach here."); + } + catch (IgniteFutureTimeoutCheckedException e) { + log.info("Expected exception: " + e.getMessage()); + } + + for (GridFutureAdapter<Object> fut : futs) + fut.onDone(); + + try { + compFut.get(); + + fail("Should never reach here."); + } + catch (IgniteCheckedException ce) { + log.info("Expected exception: " + ce.getMessage()); + + IgniteException e = U.convertException(ce); + + assertTrue(e instanceof ServiceDeploymentException); + + Throwable[] supErrs = e.getSuppressed(); + + assertEquals(failingFutsNum, supErrs.length); + + for (int i = 0; i < failingFutsNum; i++) + assertEquals(causes.get(i), supErrs[i].getCause()); + } + } + + /** + * @throws Exception if failed. + */ + public void testFailAllAfterInitialized() throws Exception { + GridServiceDeploymentCompoundFuture compFut = new GridServiceDeploymentCompoundFuture(true, ctx); + + ServiceConfiguration failingCfg = config("Failed"); + + GridServiceDeploymentFuture failingFut = new GridServiceDeploymentFuture(failingCfg); + + compFut.add(failingFut); + + int futsNum = 5; + + List<ServiceConfiguration> cfgs = new ArrayList<>(futsNum + 1); + + cfgs.add(failingCfg); + + for (int i = 0; i < futsNum; i++) { + ServiceConfiguration cfg = config(String.valueOf(i)); + + cfgs.add(cfg); + + compFut.add(new GridServiceDeploymentFuture(cfg)); + } + + compFut.serviceDeploymentMarkInitialized(); + + Exception expCause = new Exception("Test error"); + + failingFut.onDone(expCause); + + assertFailAll(compFut, cfgs, expCause); + } + + /** + * @throws Exception if failed. + */ + public void testFailAllBeforeInitialized() throws Exception { + GridServiceDeploymentCompoundFuture compFut = new GridServiceDeploymentCompoundFuture(true, ctx); + + ServiceConfiguration failingCfg = config("Failed"); + + GridServiceDeploymentFuture failingFut = new GridServiceDeploymentFuture(failingCfg); + + Exception expCause = new Exception("Test error"); + + failingFut.onDone(expCause); + + compFut.add(failingFut); + + assertFalse(compFut.isDone()); + + int futsNum = 5; + + List<ServiceConfiguration> cfgs = new ArrayList<>(futsNum + 1); + + cfgs.add(failingCfg); + + for (int i = 0; i < futsNum; i++) { + ServiceConfiguration cfg = config(String.valueOf(i)); + + cfgs.add(cfg); + + compFut.add(new GridServiceDeploymentFuture(cfg)); + } + + compFut.serviceDeploymentMarkInitialized(); + + assertFailAll(compFut, cfgs, expCause); + } + + /** + * Try waiting for the future completion and check that a proper exception is thrown. + * + * @param fut Future. + * @param expCfgs Expected cfgs. + * @param expCause Expected cause. + */ + private void assertFailAll(GridServiceDeploymentCompoundFuture fut, Collection<ServiceConfiguration> expCfgs, + Exception expCause) { + try { + fut.get(); + + fail("Should never reach here."); + } + catch (IgniteCheckedException ce) { + log.info("Expected exception: " + ce.getMessage()); + + IgniteException e = U.convertException(ce); + + assertTrue(e instanceof ServiceDeploymentException); + + assertEqualsCollections(expCfgs, ((ServiceDeploymentException)e).getFailedConfigurations()); + + Throwable actCause = e.getCause(); + + assertTrue(actCause instanceof IgniteCheckedException); + + assertEquals(expCause, actCause.getCause()); + } + } + + /** + * @param name Name. + * @return Dummy configuration with a specified name. + */ + private ServiceConfiguration config(String name) { + ServiceConfiguration cfg = new ServiceConfiguration(); + + cfg.setName(name); + + return cfg; + } +}
