IGNITE-9607: Service Grid redesign - Phase 1 - Fixes #4434. Signed-off-by: Nikolay Izhikov <nizhi...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/62c560a5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/62c560a5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/62c560a5 Branch: refs/heads/ignite-601 Commit: 62c560a544a81a5e5b90099fa9da7ab51fc585fa Parents: 457090a Author: Vyacheslav Daradur <daradu...@gmail.com> Authored: Wed Dec 26 23:17:30 2018 +0300 Committer: Nikolay Izhikov <nizhi...@apache.org> Committed: Wed Dec 26 23:17:30 2018 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 13 + .../apache/ignite/internal/GridComponent.java | 5 +- .../ignite/internal/GridKernalContext.java | 4 +- .../ignite/internal/GridKernalContextImpl.java | 12 +- .../org/apache/ignite/internal/GridTopic.java | 5 +- .../apache/ignite/internal/IgniteKernal.java | 23 +- .../ignite/internal/IgniteNodeAttributes.java | 4 + .../communication/GridIoMessageFactory.java | 18 + .../discovery/GridDiscoveryManager.java | 23 + .../cache/DynamicCacheChangeBatch.java | 20 + .../processors/cache/GridCacheProcessor.java | 7 +- .../GridDhtPartitionsExchangeFuture.java | 4 +- .../cluster/ChangeGlobalStateMessage.java | 19 + .../cluster/GridClusterStateProcessor.java | 9 +- .../continuous/GridContinuousProcessor.java | 4 +- .../service/GridServiceAssignments.java | 3 + .../service/GridServiceAssignmentsKey.java | 3 + .../service/GridServiceDeployment.java | 3 + .../GridServiceDeploymentCompoundFuture.java | 15 +- .../service/GridServiceDeploymentFuture.java | 24 +- .../service/GridServiceDeploymentKey.java | 3 + .../service/GridServiceProcessor.java | 187 +- .../processors/service/GridServiceProxy.java | 10 +- .../service/IgniteServiceProcessor.java | 1784 ++++++++++++++++++ .../service/PreparedConfigurations.java | 7 +- .../service/ServiceChangeAbstractRequest.java | 53 + .../service/ServiceChangeBatchRequest.java | 113 ++ .../service/ServiceClusterDeploymentResult.java | 73 + .../ServiceClusterDeploymentResultBatch.java | 125 ++ .../service/ServiceDeploymentActions.java | 115 ++ .../service/ServiceDeploymentManager.java | 562 ++++++ .../service/ServiceDeploymentProcessId.java | 172 ++ .../service/ServiceDeploymentRequest.java | 56 + .../service/ServiceDeploymentTask.java | 859 +++++++++ .../service/ServiceDescriptorImpl.java | 4 + .../processors/service/ServiceInfo.java | 170 ++ .../service/ServiceProcessorAdapter.java | 162 ++ .../ServiceProcessorCommonDiscoveryData.java | 55 + .../ServiceProcessorJoinNodeDiscoveryData.java | 53 + .../ServiceSingleNodeDeploymentResult.java | 162 ++ .../ServiceSingleNodeDeploymentResultBatch.java | 155 ++ .../service/ServiceUndeploymentRequest.java | 42 + .../ignite/services/ServiceConfiguration.java | 2 + .../ignite/spi/discovery/tcp/ServerImpl.java | 44 + .../ignite/internal/GridDeploymentSelfTest.java | 2 +- .../IgniteClientReconnectServicesTest.java | 49 + .../GridDiscoveryManagerAttributesSelfTest.java | 12 + ...ridCacheContinuousQueryAbstractSelfTest.java | 9 +- .../GridServiceContinuousQueryRedeployTest.java | 6 +- ...ServiceDeploymentCompoundFutureSelfTest.java | 7 +- ...rviceDeploymentExceptionPropagationTest.java | 43 +- .../GridServiceProcessorAbstractSelfTest.java | 54 +- ...GridServiceProcessorBatchDeploySelfTest.java | 54 +- ...ServiceProcessorMultiNodeConfigSelfTest.java | 40 +- .../GridServiceProcessorMultiNodeSelfTest.java | 25 +- .../GridServiceProcessorProxySelfTest.java | 5 +- .../GridServiceProcessorSingleNodeSelfTest.java | 20 +- .../GridServiceProcessorStopSelfTest.java | 2 +- .../GridServiceReassignmentSelfTest.java | 18 +- .../GridServiceSerializationSelfTest.java | 2 +- ...gniteServiceConfigVariationsFullApiTest.java | 80 +- .../IgniteServiceDynamicCachesSelfTest.java | 26 +- .../service/IgniteServiceReassignmentTest.java | 50 +- ...tDiscoveryListenerNotificationOrderTest.java | 115 ++ ...tNonSerializableStaticConfigurationTest.java | 119 ++ .../ServiceDeploymentOnActivationTest.java | 110 +- ...ServiceDeploymentOnClientDisconnectTest.java | 228 +++ .../ServiceDeploymentOutsideBaselineTest.java | 12 +- .../ServiceDeploymentProcessAbstractTest.java | 127 ++ .../ServiceDeploymentProcessIdSelfTest.java | 105 ++ ...ploymentProcessingOnCoordinatorFailTest.java | 34 + ...ploymentProcessingOnCoordinatorLeftTest.java | 125 ++ ...viceDeploymentProcessingOnNodesFailTest.java | 34 + ...viceDeploymentProcessingOnNodesLeftTest.java | 117 ++ .../processors/service/ServiceInfoSelfTest.java | 137 ++ .../ServicePredicateAccessCacheTest.java | 52 +- .../ServiceReassignmentFunctionSelfTest.java | 220 +++ .../inner/LongInitializedTestService.java | 52 + .../junits/common/GridCommonAbstractTest.java | 33 + .../testsuites/IgniteKernalSelfTestSuite.java | 25 +- ...ServiceConfigVariationsFullApiTestSuite.java | 9 - .../mvcc/CacheMvccBasicContinuousQueryTest.java | 9 +- .../Services/ServicesTest.cs | 16 +- 83 files changed, 6919 insertions(+), 450 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 4cfb361..f58f1aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -1059,6 +1059,19 @@ public final class IgniteSystemProperties { public static final String IGNITE_DEFAULT_DATA_STORAGE_PAGE_SIZE = "IGNITE_DEFAULT_DATA_STORAGE_PAGE_SIZE"; /** + * Manages the type of the implementation of the service processor (implementation of the {@link IgniteServices}). + * All nodes in the cluster must have the same value of this property. + * <p/> + * If the property is {@code true} then event-driven implementation of the service processor will be used. + * <p/> + * If the property is {@code false} then internal cache based implementation of service processor will be used. + * <p/> + * Default is {@code true}. + */ + public static final String IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED + = "IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED"; + + /** * When set to {@code true}, cache metrics are not included into the discovery metrics update message (in this * case message contains only cluster metrics). By default cache metrics are included into the message and * calculated each time the message is sent. http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java index 607217e..1f9e02e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java @@ -70,7 +70,10 @@ public interface GridComponent { CACHE_CRD_PROC, /** Encryption manager. */ - ENCRYPTION_MGR + ENCRYPTION_MGR, + + /** Service processor. */ + SERVICE_PROC } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index fdb8ebc..691fe37 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -36,6 +36,7 @@ import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager; import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor; import org.apache.ignite.internal.stat.IoStatisticsManager; import org.apache.ignite.internal.processors.compress.CompressionProcessor; +import org.apache.ignite.internal.processors.service.ServiceProcessorAdapter; import org.apache.ignite.internal.worker.WorkersRegistry; import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor; import org.apache.ignite.internal.processors.authentication.IgniteAuthenticationProcessor; @@ -67,7 +68,6 @@ import org.apache.ignite.internal.processors.rest.GridRestProcessor; import org.apache.ignite.internal.processors.schedule.IgniteScheduleProcessorAdapter; import org.apache.ignite.internal.processors.security.GridSecurityProcessor; import org.apache.ignite.internal.processors.segmentation.GridSegmentationProcessor; -import org.apache.ignite.internal.processors.service.GridServiceProcessor; import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor; import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor; import org.apache.ignite.internal.processors.task.GridTaskProcessor; @@ -224,7 +224,7 @@ public interface GridKernalContext extends Iterable<GridComponent> { * * @return Service processor. */ - public GridServiceProcessor service(); + public ServiceProcessorAdapter service(); /** * Gets port processor. http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 8131899..1219d00 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -48,6 +48,7 @@ import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.managers.failover.GridFailoverManager; import org.apache.ignite.internal.managers.indexing.GridIndexingManager; import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager; +import org.apache.ignite.internal.processors.service.ServiceProcessorAdapter; import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor; import org.apache.ignite.internal.processors.authentication.IgniteAuthenticationProcessor; import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager; @@ -84,7 +85,6 @@ import org.apache.ignite.internal.processors.rest.GridRestProcessor; import org.apache.ignite.internal.processors.schedule.IgniteScheduleProcessorAdapter; import org.apache.ignite.internal.processors.security.GridSecurityProcessor; import org.apache.ignite.internal.processors.segmentation.GridSegmentationProcessor; -import org.apache.ignite.internal.processors.service.GridServiceProcessor; import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor; import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor; import org.apache.ignite.internal.processors.task.GridTaskProcessor; @@ -208,7 +208,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringInclude - private GridServiceProcessor svcProc; + private ServiceProcessorAdapter srvcProc; /** */ @GridToStringInclude @@ -608,8 +608,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable portProc = (GridPortProcessor)comp; else if (comp instanceof GridClosureProcessor) closProc = (GridClosureProcessor)comp; - else if (comp instanceof GridServiceProcessor) - svcProc = (GridServiceProcessor)comp; + else if (comp instanceof ServiceProcessorAdapter) + srvcProc = (ServiceProcessorAdapter)comp; else if (comp instanceof IgniteScheduleProcessorAdapter) scheduleProc = (IgniteScheduleProcessorAdapter)comp; else if (comp instanceof GridSegmentationProcessor) @@ -762,8 +762,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ - @Override public GridServiceProcessor service() { - return svcProc; + @Override public ServiceProcessorAdapter service() { + return srvcProc; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java index 95d7717..437ee4d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java @@ -136,7 +136,10 @@ public enum GridTopic { TOPIC_CACHE_COORDINATOR, /** */ - TOPIC_GEN_ENC_KEY; + TOPIC_GEN_ENC_KEY, + + /** */ + TOPIC_SERVICES; /** Enum values. */ private static final GridTopic[] VALS = values(); http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 95bf49c..b84771a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -116,6 +116,7 @@ import org.apache.ignite.internal.managers.indexing.GridIndexingManager; import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager; import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller; import org.apache.ignite.internal.processors.GridProcessor; +import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor; import org.apache.ignite.internal.processors.authentication.IgniteAuthenticationProcessor; @@ -163,6 +164,7 @@ import org.apache.ignite.internal.processors.rest.GridRestProcessor; import org.apache.ignite.internal.processors.security.GridSecurityProcessor; import org.apache.ignite.internal.processors.segmentation.GridSegmentationProcessor; import org.apache.ignite.internal.processors.service.GridServiceProcessor; +import org.apache.ignite.internal.processors.service.IgniteServiceProcessor; import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor; import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor; import org.apache.ignite.internal.processors.task.GridTaskProcessor; @@ -224,6 +226,7 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2; import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONFIG_URL; import static org.apache.ignite.IgniteSystemProperties.IGNITE_DAEMON; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED; import static org.apache.ignite.IgniteSystemProperties.IGNITE_NO_ASCII; import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID; import static org.apache.ignite.IgniteSystemProperties.IGNITE_REST_START_ON_CLIENT; @@ -251,6 +254,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STORAGE_ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STREAMER_POOL_SIZE; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_FEATURES; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IPS; @@ -1016,7 +1020,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { startProcessor(new GridCacheProcessor(ctx)); startProcessor(new GridQueryProcessor(ctx)); startProcessor(new ClientListenerProcessor(ctx)); - startProcessor(new GridServiceProcessor(ctx)); + startProcessor(createServiceProcessor()); startProcessor(new GridTaskSessionProcessor(ctx)); startProcessor(new GridJobProcessor(ctx)); startProcessor(new GridTaskProcessor(ctx)); @@ -1357,6 +1361,20 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } /** + * Creates service processor depend on {@link IgniteSystemProperties#IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED}. + * + * @return Service processor. + */ + private GridProcessorAdapter createServiceProcessor() { + final boolean srvcProcMode = getBoolean(IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED, true); + + if (srvcProcMode) + return new IgniteServiceProcessor(ctx); + + return new GridServiceProcessor(ctx); + } + + /** * Validates common configuration parameters. * * @param cfg Configuration. @@ -1643,6 +1661,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { ctx.addNodeAttribute(e.getKey(), e.getValue()); } } + + ctx.addNodeAttribute(ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED, + ctx.service() instanceof IgniteServiceProcessor); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java index 45ca234..1740072 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java @@ -208,6 +208,10 @@ public final class IgniteNodeAttributes { /** Supported features. */ public static final String ATTR_IGNITE_FEATURES = ATTR_PREFIX + ".features"; + /** Ignite services processor mode. */ + public static final String ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED = ATTR_PREFIX + + ".event.driven.service.processor.enabled"; + /** * Enforces singleton. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index b1c023a..5e7811b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -184,6 +184,9 @@ import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQuery import org.apache.ignite.internal.processors.query.schema.message.SchemaOperationStatusMessage; import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultRequest; import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultResponse; +import org.apache.ignite.internal.processors.service.ServiceSingleNodeDeploymentResult; +import org.apache.ignite.internal.processors.service.ServiceDeploymentProcessId; +import org.apache.ignite.internal.processors.service.ServiceSingleNodeDeploymentResultBatch; import org.apache.ignite.internal.util.GridByteArrayList; import org.apache.ignite.internal.util.GridIntList; import org.apache.ignite.internal.util.GridLongList; @@ -1120,6 +1123,21 @@ public class GridIoMessageFactory implements MessageFactory { break; + case 167: + msg = new ServiceDeploymentProcessId(); + + break; + + case 168: + msg = new ServiceSingleNodeDeploymentResultBatch(); + + break; + + case 169: + msg = new ServiceSingleNodeDeploymentResult(); + + break; + // [-3..119] [124..129] [-23..-28] [-36..-55] - this // [120..123] - DR // [-4..-22, -30..-35] - SQL http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 78319a0..556af19 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -143,6 +143,7 @@ import org.jetbrains.annotations.Nullable; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2; import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_HISTORY_SIZE; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED; import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SECURITY_COMPATIBILITY_MODE; import static org.apache.ignite.IgniteSystemProperties.getInteger; @@ -157,6 +158,7 @@ import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_REGIONS_OFFHEAP_SIZE; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_BINARY_STRING_SER_VER_2; @@ -789,6 +791,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ctx.cache().context().exchange().onLocalJoin(discoEvt, discoCache); + ctx.service().onLocalJoin(discoEvt, discoCache); + ctx.authentication().onLocalJoin(); ctx.encryption().onLocalJoin(); @@ -847,6 +851,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ctx.cache().context().exchange().onLocalJoin(localJoinEvent(), discoCache); + ctx.service().onLocalJoin(localJoinEvent(), discoCache); + ctx.cluster().clientReconnectFuture().listen(new CI1<IgniteFuture<?>>() { @Override public void apply(IgniteFuture<?> fut) { try { @@ -1223,6 +1229,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { boolean locDelayAssign = locNode.attribute(ATTR_LATE_AFFINITY_ASSIGNMENT); + Boolean locSrvcProcMode = locNode.attribute(ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED); Boolean locSecurityCompatibilityEnabled = locNode.attribute(ATTR_SECURITY_COMPATIBILITY_MODE); for (ClusterNode n : nodes) { @@ -1308,6 +1315,22 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ", rmtAddrs=" + U.addressesAsString(n) + ", rmtNode=" + U.toShortString(n) + "]"); } + Boolean rmtSrvcProcModeAttr = n.attribute(ATTR_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED); + + final boolean rmtSrvcProcMode = rmtSrvcProcModeAttr != null ? rmtSrvcProcModeAttr : false; + + if (!F.eq(locSrvcProcMode, rmtSrvcProcMode)) { + throw new IgniteCheckedException("Local node's " + IGNITE_EVENT_DRIVEN_SERVICE_PROCESSOR_ENABLED + + " property value differs from remote node's value " + + "(to make sure all nodes in topology have identical service processor mode, " + + "configure system property explicitly) " + + "[locSrvcProcMode=" + locSrvcProcMode + + ", rmtSrvcProcMode=" + rmtSrvcProcMode + + ", locNodeAddrs=" + U.addressesAsString(locNode) + + ", rmtNodeAddrs=" + U.addressesAsString(n) + + ", locNodeId=" + locNode.id() + ", rmtNode=" + U.toShortString(n) + "]"); + } + if (n.version().compareToIgnoreTimestamp(SERVICE_PERMISSIONS_SINCE) >= 0 && ctx.security().enabled() // Matters only if security enabled. ) { http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java index d85e29b..9dc1195 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java @@ -23,6 +23,8 @@ import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.service.ServiceDeploymentActions; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; @@ -52,6 +54,10 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { /** Restarting caches. */ private Set<String> restartingCaches; + /** Affinity (cache related) services updates to be processed on services deployment process. */ + @GridToStringExclude + @Nullable private transient ServiceDeploymentActions serviceDeploymentActions; + /** * @param reqs Requests. */ @@ -118,6 +124,20 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { } /** + * @return Services deployment actions to be processed on services deployment process. + */ + @Nullable public ServiceDeploymentActions servicesDeploymentActions() { + return serviceDeploymentActions; + } + + /** + * @param serviceDeploymentActions Services deployment actions to be processed on services deployment process. + */ + public void servicesDeploymentActions(ServiceDeploymentActions serviceDeploymentActions) { + this.serviceDeploymentActions = serviceDeploymentActions; + } + + /** * @return {@code True} if required to start all caches on client node. */ public boolean startCaches() { http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index b919024..1c44eaf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -137,6 +137,7 @@ import org.apache.ignite.internal.processors.query.schema.SchemaNodeLeaveExchang import org.apache.ignite.internal.processors.query.schema.message.SchemaAbstractDiscoveryMessage; import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage; import org.apache.ignite.internal.processors.security.SecurityContext; +import org.apache.ignite.internal.processors.service.GridServiceProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions; import org.apache.ignite.internal.util.F0; @@ -1019,7 +1020,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (!active) return; - ctx.service().onUtilityCacheStarted(); + if (ctx.service() instanceof GridServiceProcessor) + ((GridServiceProcessor)ctx.service()).onUtilityCacheStarted(); final AffinityTopologyVersion startTopVer = ctx.discovery().localJoin().joinTopologyVersion(); @@ -3047,7 +3049,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (exchActions.systemCachesStarting() && exchActions.stateChangeRequest() == null) { ctx.dataStructures().restoreStructuresState(ctx); - ctx.service().updateUtilityCache(); + if (ctx.service() instanceof GridServiceProcessor) + ((GridServiceProcessor)ctx.service()).updateUtilityCache(); } if (err == null) http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index dbc51f7..d4d89bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -99,6 +99,7 @@ import org.apache.ignite.internal.processors.cluster.BaselineTopology; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; +import org.apache.ignite.internal.processors.service.GridServiceProcessor; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.TimeBag; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -1136,7 +1137,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte try { cctx.kernalContext().dataStructures().onDeActivate(cctx.kernalContext()); - cctx.kernalContext().service().onDeActivate(cctx.kernalContext()); + if (cctx.kernalContext().service() instanceof GridServiceProcessor) + ((GridServiceProcessor)cctx.kernalContext().service()).onDeActivate(cctx.kernalContext()); assert registerCachesFuture == null : "No caches registration should be scheduled before new caches have started."; http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java index 51a65bb..b1ff048 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.ExchangeActions; import org.apache.ignite.internal.processors.cache.StoredCacheData; +import org.apache.ignite.internal.processors.service.ServiceDeploymentActions; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; @@ -65,6 +66,10 @@ public class ChangeGlobalStateMessage implements DiscoveryCustomMessage { @GridToStringExclude private transient ExchangeActions exchangeActions; + /** Services deployment actions to be processed on services deployment process. */ + @GridToStringExclude + @Nullable private transient ServiceDeploymentActions serviceDeploymentActions; + /** * @param reqId State change request ID. * @param initiatingNodeId Node initiated state change. @@ -117,6 +122,20 @@ public class ChangeGlobalStateMessage implements DiscoveryCustomMessage { this.exchangeActions = exchangeActions; } + /** + * @return Services deployment actions to be processed on services deployment process. + */ + @Nullable public ServiceDeploymentActions servicesDeploymentActions() { + return serviceDeploymentActions; + } + + /** + * @param serviceDeploymentActions Services deployment actions to be processed on services deployment process. + */ + public void servicesDeploymentActions(ServiceDeploymentActions serviceDeploymentActions) { + this.serviceDeploymentActions = serviceDeploymentActions; + } + /** {@inheritDoc} */ @Override public IgniteUuid id() { return id; http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index 9d2adae..b347d39 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -56,6 +56,7 @@ import org.apache.ignite.internal.processors.cache.StoredCacheData; import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener; import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage; import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage; +import org.apache.ignite.internal.processors.service.GridServiceProcessor; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -1165,9 +1166,13 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I Exception e = null; try { - ctx.service().onUtilityCacheStarted(); + if (ctx.service() instanceof GridServiceProcessor) { + GridServiceProcessor srvcProc = (GridServiceProcessor)ctx.service(); - ctx.service().onActivate(ctx); + srvcProc.onUtilityCacheStarted(); + + srvcProc.onActivate(ctx); + } ctx.dataStructures().onActivate(ctx); http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 22d6997..2456efa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -69,6 +69,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler; +import org.apache.ignite.internal.processors.service.GridServiceProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -295,7 +296,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { ctx.cacheObjects().onContinuousProcessorStarted(ctx); - ctx.service().onContinuousProcessorStarted(ctx); + if (ctx.service() instanceof GridServiceProcessor) + ((GridServiceProcessor)ctx.service()).onContinuousProcessorStarted(ctx); if (log.isDebugEnabled()) log.debug("Continuous processor started."); http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceAssignments.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceAssignments.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceAssignments.java index 66b2816..290fbf4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceAssignments.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceAssignments.java @@ -30,7 +30,10 @@ import org.apache.ignite.services.ServiceConfiguration; /** * Service per-node assignment. + * + * @deprecated Services internals use messages for deployment management instead of the utility cache, since Ignite 2.8. */ +@Deprecated public class GridServiceAssignments implements Serializable, GridCacheInternal { /** Serialization version. */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceAssignmentsKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceAssignmentsKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceAssignmentsKey.java index 5b26f9f..87cb0ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceAssignmentsKey.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceAssignmentsKey.java @@ -22,7 +22,10 @@ import org.apache.ignite.internal.util.typedef.internal.S; /** * Service configuration key. + * + * @deprecated Services internals use messages for deployment management instead of the utility cache, since Ignite 2.8. */ +@Deprecated public class GridServiceAssignmentsKey extends GridCacheUtilityKey<GridServiceAssignmentsKey> { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeployment.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeployment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeployment.java index f25c38e..ae8670f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeployment.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeployment.java @@ -25,7 +25,10 @@ import org.apache.ignite.services.ServiceConfiguration; /** * Service deployment. + * + * @deprecated Services internals use messages for deployment management instead of the utility cache, since Ignite 2.8. */ +@Deprecated public class GridServiceDeployment implements GridCacheInternal, Serializable { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentCompoundFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentCompoundFuture.java index 45ccc24..f776285 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentCompoundFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentCompoundFuture.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.service; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -33,9 +34,9 @@ import org.jetbrains.annotations.Nullable; * IgniteInternalFuture#get get()} method after all futures complete or fail. Inner exception will contain * configurations of failed services. */ -public class GridServiceDeploymentCompoundFuture extends GridCompoundFuture<Object, Object> { - /** Names of services written to cache during current deployment. */ - private Collection<String> svcsToRollback; +public class GridServiceDeploymentCompoundFuture<T extends Serializable> extends GridCompoundFuture<Object, Object> { + /** Ids of services written to cache during current deployment. */ + private Collection<T> svcsToRollback; /** */ private volatile ServiceDeploymentException err; @@ -71,21 +72,21 @@ public class GridServiceDeploymentCompoundFuture extends GridCompoundFuture<Obje * @param fut Child future. * @param own If {@code true}, then corresponding service will be cancelled on failure. */ - public void add(GridServiceDeploymentFuture fut, boolean own) { + public void add(GridServiceDeploymentFuture<T> fut, boolean own) { super.add(fut); if (own) { if (svcsToRollback == null) svcsToRollback = new ArrayList<>(); - svcsToRollback.add(fut.configuration().getName()); + svcsToRollback.add(fut.serviceId()); } } /** - * @return Collection of names of services that were written to cache during current deployment. + * @return Collection of ids of services that were written to cache during current deployment. */ - public Collection<String> servicesToRollback() { + public Collection<T> servicesToRollback() { if (svcsToRollback != null) return svcsToRollback; else http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentFuture.java index c87cc6e..ba4422e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentFuture.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.service; +import java.io.Serializable; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.services.ServiceConfiguration; @@ -24,15 +25,27 @@ import org.apache.ignite.services.ServiceConfiguration; /** * Service deployment future. */ -public class GridServiceDeploymentFuture extends GridFutureAdapter<Object> { +public class GridServiceDeploymentFuture<T extends Serializable> extends GridFutureAdapter<Object> { /** */ private final ServiceConfiguration cfg; + /** */ + private final T srvcId; + /** * @param cfg Configuration. */ public GridServiceDeploymentFuture(ServiceConfiguration cfg) { + this(cfg, null); + } + + /** + * @param cfg Configuration. + * @param srvcId Service id. + */ + public GridServiceDeploymentFuture(ServiceConfiguration cfg, T srvcId) { this.cfg = cfg; + this.srvcId = srvcId; } /** @@ -42,8 +55,15 @@ public class GridServiceDeploymentFuture extends GridFutureAdapter<Object> { return cfg; } + /** + * @return Service id. + */ + public T serviceId() { + return srvcId; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridServiceDeploymentFuture.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentKey.java index 80415bb..e5b85dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentKey.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentKey.java @@ -22,7 +22,10 @@ import org.apache.ignite.internal.util.typedef.internal.S; /** * Service configuration key. + * + * @deprecated Services internals use messages for deployment management instead of the utility cache, since Ignite 2.8. */ +@Deprecated public class GridServiceDeploymentKey extends GridCacheUtilityKey<GridServiceDeploymentKey> { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 3adc282..8fcbe4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -61,7 +61,6 @@ import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; -import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; import org.apache.ignite.internal.processors.cache.CacheIteratorConverter; @@ -118,9 +117,18 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA /** * Grid service processor. + * <p/> + * Obsolete implementation of service processor, based on replicated system cache. + * <p/> + * NOTE: if you fix a bug in this class, please take a look in {@link IgniteServiceProcessor}, perhaps the class + * contains a similar block of code which also should be updated. + * + * @see IgniteServiceProcessor + * @deprecated Here is improved, but uncompatible implementation {@link IgniteServiceProcessor}, see IEP-17 for details. */ +@Deprecated @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", "ConstantConditions"}) -public class GridServiceProcessor extends GridProcessorAdapter implements IgniteChangeGlobalStateSupport { +public class GridServiceProcessor extends ServiceProcessorAdapter implements IgniteChangeGlobalStateSupport { /** Time to wait before reassignment retries. */ private static final long RETRY_TIMEOUT = 1000; @@ -136,7 +144,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite private final Map<String, Collection<ServiceContextImpl>> locSvcs = new HashMap<>(); /** Deployment futures. */ - private final ConcurrentMap<String, GridServiceDeploymentFuture> depFuts = new ConcurrentHashMap<>(); + private final ConcurrentMap<String, GridServiceDeploymentFuture<String>> depFuts = new ConcurrentHashMap<>(); /** Deployment futures. */ private final ConcurrentMap<String, GridFutureAdapter<?>> undepFuts = new ConcurrentHashMap<>(); @@ -456,58 +464,38 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite throw new IgniteException("Service configuration check failed (" + desc + ")"); } - /** - * @param name Service name. - * @param svc Service. - * @return Future. - */ - public IgniteInternalFuture<?> deployNodeSingleton(ClusterGroup prj, String name, Service svc) { - return deployMultiple(prj, name, svc, 0, 1); + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> deployNodeSingleton(ClusterGroup prj, String name, Service srvc) { + return deployMultiple(prj, name, srvc, 0, 1); } - /** - * @param name Service name. - * @param svc Service. - * @return Future. - */ - public IgniteInternalFuture<?> deployClusterSingleton(ClusterGroup prj, String name, Service svc) { - return deployMultiple(prj, name, svc, 1, 1); + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> deployClusterSingleton(ClusterGroup prj, String name, Service srvc) { + return deployMultiple(prj, name, srvc, 1, 1); } - /** - * @param name Service name. - * @param svc Service. - * @param totalCnt Total count. - * @param maxPerNodeCnt Max per-node count. - * @return Future. - */ - public IgniteInternalFuture<?> deployMultiple(ClusterGroup prj, String name, Service svc, int totalCnt, + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> deployMultiple(ClusterGroup prj, String name, Service srvc, int totalCnt, int maxPerNodeCnt) { ServiceConfiguration cfg = new ServiceConfiguration(); cfg.setName(name); - cfg.setService(svc); + cfg.setService(srvc); cfg.setTotalCount(totalCnt); cfg.setMaxPerNodeCount(maxPerNodeCnt); return deployAll(prj, Collections.singleton(cfg)); } - /** - * @param name Service name. - * @param svc Service. - * @param cacheName Cache name. - * @param affKey Affinity key. - * @return Future. - */ - public IgniteInternalFuture<?> deployKeyAffinitySingleton(String name, Service svc, String cacheName, + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> deployKeyAffinitySingleton(String name, Service srvc, String cacheName, Object affKey) { A.notNull(affKey, "affKey"); ServiceConfiguration cfg = new ServiceConfiguration(); cfg.setName(name); - cfg.setService(svc); + cfg.setService(srvc); cfg.setCacheName(cacheName); cfg.setAffinityKey(affKey); cfg.setTotalCount(1); @@ -522,13 +510,13 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite * @param dfltNodeFilter Default NodeFilter. * @return Configurations to deploy. */ - private PreparedConfigurations prepareServiceConfigurations(Collection<ServiceConfiguration> cfgs, + private PreparedConfigurations<String> prepareServiceConfigurations(Collection<ServiceConfiguration> cfgs, IgnitePredicate<ClusterNode> dfltNodeFilter) { List<ServiceConfiguration> cfgsCp = new ArrayList<>(cfgs.size()); Marshaller marsh = ctx.config().getMarshaller(); - List<GridServiceDeploymentFuture> failedFuts = null; + List<GridServiceDeploymentFuture<String>> failedFuts = null; for (ServiceConfiguration cfg : cfgs) { Exception err = null; @@ -578,7 +566,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite if (failedFuts == null) failedFuts = new ArrayList<>(); - GridServiceDeploymentFuture fut = new GridServiceDeploymentFuture(cfg); + GridServiceDeploymentFuture<String> fut = new GridServiceDeploymentFuture<>(cfg); fut.onDone(err); @@ -586,15 +574,11 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite } } - return new PreparedConfigurations(cfgsCp, failedFuts); + return new PreparedConfigurations<>(cfgsCp, failedFuts); } - /** - * @param prj Grid projection. - * @param cfgs Service configurations. - * @return Future for deployment. - */ - public IgniteInternalFuture<?> deployAll(ClusterGroup prj, Collection<ServiceConfiguration> cfgs) { + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> deployAll(ClusterGroup prj, Collection<ServiceConfiguration> cfgs) { if (prj == null) // Deploy to servers by default if no projection specified. return deployAll(cfgs, ctx.cluster().get().forServers().predicate()); @@ -614,11 +598,11 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite @Nullable IgnitePredicate<ClusterNode> dfltNodeFilter) { assert cfgs != null; - PreparedConfigurations srvCfg = prepareServiceConfigurations(cfgs, dfltNodeFilter); + PreparedConfigurations<String> srvCfg = prepareServiceConfigurations(cfgs, dfltNodeFilter); List<ServiceConfiguration> cfgsCp = srvCfg.cfgs; - List<GridServiceDeploymentFuture> failedFuts = srvCfg.failedFuts; + List<GridServiceDeploymentFuture<String>> failedFuts = srvCfg.failedFuts; Collections.sort(cfgsCp, new Comparator<ServiceConfiguration>() { @Override public int compare(ServiceConfiguration cfg1, ServiceConfiguration cfg2) { @@ -626,10 +610,10 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite } }); - GridServiceDeploymentCompoundFuture res; + GridServiceDeploymentCompoundFuture<String> res; while (true) { - res = new GridServiceDeploymentCompoundFuture(); + res = new GridServiceDeploymentCompoundFuture<>(); if (ctx.deploy().enabled()) ctx.cache().context().deploy().ignoreOwnership(true); @@ -684,7 +668,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite "Failed to deploy services, client node disconnected: " + cfgs); for (String name : res.servicesToRollback()) { - GridServiceDeploymentFuture fut = depFuts.remove(name); + GridServiceDeploymentFuture<String> fut = depFuts.remove(name); if (fut != null) fut.onDone(err); @@ -694,7 +678,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite } if (failedFuts != null) { - for (GridServiceDeploymentFuture fut : failedFuts) + for (GridServiceDeploymentFuture<String> fut : failedFuts) res.add(fut, false); } @@ -708,13 +692,13 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite * @param cfg Service configuration. * @throws IgniteCheckedException If operation failed. */ - private void writeServiceToCache(GridServiceDeploymentCompoundFuture res, ServiceConfiguration cfg) + private void writeServiceToCache(GridServiceDeploymentCompoundFuture<String> res, ServiceConfiguration cfg) throws IgniteCheckedException { String name = cfg.getName(); - GridServiceDeploymentFuture fut = new GridServiceDeploymentFuture(cfg); + GridServiceDeploymentFuture<String> fut = new GridServiceDeploymentFuture<>(cfg); - GridServiceDeploymentFuture old = depFuts.putIfAbsent(name, fut); + GridServiceDeploymentFuture<String> old = depFuts.putIfAbsent(name, fut); try { if (old != null) { @@ -772,11 +756,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite } } - /** - * @param name Service name. - * @return Future. - */ - public IgniteInternalFuture<?> cancel(String name) { + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> cancel(String name) { while (true) { try { return removeServiceFromCache(name).fut; @@ -795,10 +776,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite } } - /** - * @return Future. - */ - public IgniteInternalFuture<?> cancelAll() { + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> cancelAll() { Iterator<Cache.Entry<Object, Object>> it = serviceEntries(ServiceDeploymentPredicate.INSTANCE); List<String> svcNames = new ArrayList<>(); @@ -812,13 +791,10 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite return cancelAll(svcNames); } - /** - * @param svcNames Name of service to deploy. - * @return Future. - */ + /** {@inheritDoc} */ @SuppressWarnings("unchecked") - public IgniteInternalFuture<?> cancelAll(Collection<String> svcNames) { - List<String> svcNamesCp = new ArrayList<>(svcNames); + @Override public IgniteInternalFuture<?> cancelAll(Collection<String> servicesNames) { + List<String> svcNamesCp = new ArrayList<>(servicesNames); Collections.sort(svcNamesCp); @@ -830,7 +806,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite List<String> toRollback = new ArrayList<>(); try (Transaction tx = serviceCache().txStart(PESSIMISTIC, READ_COMMITTED)) { - for (String name : svcNames) { + for (String name : servicesNames) { if (res == null) res = new GridCompoundFuture<>(); @@ -923,18 +899,13 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite } } - /** - * @param name Service name. - * @param timeout If greater than 0 limits task execution time. Cannot be negative. - * @return Service topology. - * @throws IgniteCheckedException On error. - */ - public Map<UUID, Integer> serviceTopology(String name, long timeout) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public Map<UUID, Integer> serviceTopology(String name, long timeout) throws IgniteCheckedException { IgniteInternalCache<Object, Object> cache = serviceCache(); ClusterNode node = cache.affinity().mapKeyToNode(name); - final ServiceTopologyCallable call = new ServiceTopologyCallable(name); + final ServiceTopologyCallable call = new ServiceTopologyCallable(name, pendingJobCtxs); return ctx.closure().callAsyncNoFailover( GridClosureCallMode.BROADCAST, @@ -958,10 +929,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite return val != null ? val.assigns() : null; } - /** - * @return Collection of service descriptors. - */ - public Collection<ServiceDescriptor> serviceDescriptors() { + /** {@inheritDoc} */ + @Override public Collection<ServiceDescriptor> serviceDescriptors() { Collection<ServiceDescriptor> descs = new ArrayList<>(); Iterator<Cache.Entry<Object, Object>> it = serviceEntries(ServiceDeploymentPredicate.INSTANCE); @@ -992,12 +961,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite return descs; } - /** - * @param name Service name. - * @param <T> Service type. - * @return Service by specified service name. - */ - public <T> T service(String name) { + /** {@inheritDoc} */ + @Override public <T> T service(String name) { ctx.security().authorize(name, SecurityPermission.SERVICE_INVOKE, null); Collection<ServiceContextImpl> ctxs; @@ -1024,11 +989,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite } } - /** - * @param name Service name. - * @return Service by specified service name. - */ - public ServiceContextImpl serviceContext(String name) { + /** {@inheritDoc} */ + @Override public ServiceContextImpl serviceContext(String name) { Collection<ServiceContextImpl> ctxs; synchronized (locSvcs) { @@ -1051,17 +1013,9 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite } } - /** - * @param prj Grid projection. - * @param name Service name. - * @param svcItf Service class. - * @param sticky Whether multi-node request should be done. - * @param timeout If greater than 0 limits service acquire time. Cannot be negative. - * @param <T> Service interface type. - * @return The proxy of a service by its name and class. - * @throws IgniteException If failed to create proxy. - */ - public <T> T serviceProxy(ClusterGroup prj, String name, Class<? super T> svcItf, boolean sticky, long timeout) + /** {@inheritDoc} */ + @Override public <T> T serviceProxy(ClusterGroup prj, String name, Class<? super T> srvcCls, boolean sticky, + long timeout) throws IgniteException { ctx.security().authorize(name, SecurityPermission.SERVICE_INVOKE, null); @@ -1072,16 +1026,16 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite Service svc = ctx.service(); if (svc != null) { - if (!svcItf.isAssignableFrom(svc.getClass())) + if (!srvcCls.isAssignableFrom(svc.getClass())) throw new IgniteException("Service does not implement specified interface [svcItf=" + - svcItf.getName() + ", svcCls=" + svc.getClass().getName() + ']'); + srvcCls.getName() + ", svcCls=" + svc.getClass().getName() + ']'); return (T)svc; } } } - return new GridServiceProxy<T>(prj, name, svcItf, sticky, timeout, ctx).proxy(); + return new GridServiceProxy<T>(prj, name, srvcCls, sticky, timeout, ctx).proxy(); } /** @@ -1097,12 +1051,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite return false; } - /** - * @param name Service name. - * @param <T> Service type. - * @return Services by specified service name. - */ - public <T> Collection<T> services(String name) { + /** {@inheritDoc} */ + @Override public <T> Collection<T> services(String name) { ctx.security().authorize(name, SecurityPermission.SERVICE_INVOKE, null); Collection<ServiceContextImpl> ctxs; @@ -1973,7 +1923,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite t = th; } - GridServiceDeploymentFuture fut = depFuts.get(assigns.name()); + GridServiceDeploymentFuture<String> fut = depFuts.get(assigns.name()); if (fut != null && fut.configuration().equalsIgnoreNodeFilter(assigns.configuration())) { depFuts.remove(assigns.name(), fut); @@ -2129,11 +2079,16 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite @LoggerResource private transient IgniteLogger log; + /** */ + private final List<ComputeJobContext> pendingCtxs; + /** * @param svcName Service name. + * @param pendingCtxs Pending compute job contexts that waiting for utility cache initialization. */ - public ServiceTopologyCallable(String svcName) { + public ServiceTopologyCallable(String svcName, List<ComputeJobContext> pendingCtxs) { this.svcName = svcName; + this.pendingCtxs = pendingCtxs; } /** {@inheritDoc} */ @@ -2141,8 +2096,6 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite IgniteInternalCache<Object, Object> cache = ignite.context().cache().utilityCache(); if (cache == null) { - List<ComputeJobContext> pendingCtxs = ignite.context().service().pendingJobCtxs; - synchronized (pendingCtxs) { // Double check cache reference after lock acqusition. cache = ignite.context().cache().utilityCache(); http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java index 4d98187..aa6ce44 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java @@ -197,19 +197,15 @@ public class GridServiceProxy<T> implements Serializable { true).get(); } } - catch (GridServiceNotFoundException | ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Service was not found or topology changed (will retry): " + e.getMessage()); - } catch (RuntimeException | Error e) { throw e; } catch (IgniteCheckedException e) { // Check if ignorable exceptions are in the cause chain. - Throwable ignorableCause = X.cause(e, GridServiceNotFoundException.class); + Throwable ignorableCause = X.cause(e, ClusterTopologyCheckedException.class); - if (ignorableCause == null) - ignorableCause = X.cause(e, ClusterTopologyCheckedException.class); + if (ignorableCause == null && ctx.service() instanceof GridServiceProcessor) + ignorableCause = X.cause(e, GridServiceNotFoundException.class); if (ignorableCause != null) { if (log.isDebugEnabled())