This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new edb32fa046a IGNITE-22994 Refactor subscription to
activation/deactivation events for internal components - Fixes #11480.
edb32fa046a is described below
commit edb32fa046a9707c5e9eeb90c0af59ffe9a04e2b
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Thu Aug 15 17:22:30 2024 +0300
IGNITE-22994 Refactor subscription to activation/deactivation events for
internal components - Fixes #11480.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../cluster/DistributedBaselineConfiguration.java | 16 +++++++++++-----
.../managers/encryption/GridEncryptionManager.java | 2 ++
.../dht/preloader/GridDhtPartitionsExchangeFuture.java | 8 ++++----
.../processors/cluster/GridClusterStateProcessor.java | 10 ++++------
.../datastructures/DataStructuresProcessor.java | 2 ++
.../persistence/DistributedMetaStorageImpl.java | 3 +++
.../subscription/GridInternalSubscriptionProcessor.java | 16 +++++++++++++++-
.../internal/processors/task/GridTaskProcessor.java | 4 +++-
8 files changed, 44 insertions(+), 17 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedBaselineConfiguration.java
b/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedBaselineConfiguration.java
index 4e0ca5ac4ff..2049e4055fd 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedBaselineConfiguration.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedBaselineConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.cluster;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
+import
org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import
org.apache.ignite.internal.processors.configuration.distributed.DistributePropertyListener;
import
org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty;
import
org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener;
@@ -37,7 +38,7 @@ import static
org.apache.ignite.internal.processors.configuration.distributed.Di
/**
* Distributed baseline configuration.
*/
-public class DistributedBaselineConfiguration {
+public class DistributedBaselineConfiguration implements
IgniteChangeGlobalStateSupport {
/** Default auto-adjust timeout for persistence grid. */
private static final int DEFAULT_PERSISTENCE_TIMEOUT = 5 * 60_000;
@@ -104,6 +105,8 @@ public class DistributedBaselineConfiguration {
}
}
);
+
+ isp.registerGlobalStateListener(this);
}
/** */
@@ -116,10 +119,8 @@ public class DistributedBaselineConfiguration {
baselineAutoAdjustTimeout.addListener(lsnr);
}
- /**
- * Called when cluster performing activation.
- */
- public void onActivate() throws IgniteCheckedException {
+ /** {@inheritDoc} */
+ @Override public void onActivate(GridKernalContext kctx) throws
IgniteCheckedException {
if (log.isInfoEnabled())
log.info(format(AUTO_ADJUST_CONFIGURED_MESSAGE,
(isBaselineAutoAdjustEnabled() ? "enabled" : "disabled"),
@@ -127,6 +128,11 @@ public class DistributedBaselineConfiguration {
));
}
+ /** {@inheritDoc} */
+ @Override public void onDeActivate(GridKernalContext kctx) {
+ // No-op.
+ }
+
/**
* @return Value of manual baseline control or auto adjusting baseline.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
index ff8fdecb764..3924c002b9f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
@@ -311,6 +311,8 @@ public class GridEncryptionManager extends
GridManagerAdapter<EncryptionSpi> imp
}
});
+ ctx.internalSubscriptionProcessor().registerGlobalStateListener(this);
+
prepareMKChangeProc = new DistributedProcess<>(ctx,
MASTER_KEY_CHANGE_PREPARE, this::prepareMasterKeyChange,
this::finishPrepareMasterKeyChange);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index ede7f87973d..03c21b3b614 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -109,6 +109,7 @@ import
org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import
org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.internal.processors.metric.GridMetricManager;
import org.apache.ignite.internal.processors.security.SecurityContext;
+import
org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
import org.apache.ignite.internal.processors.tracing.NoopSpan;
import org.apache.ignite.internal.processors.tracing.Span;
import org.apache.ignite.internal.processors.tracing.SpanTags;
@@ -1382,15 +1383,14 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
cctx.exchange().exchangerBlockingSectionBegin();
try {
- kctx.dataStructures().onDeActivate(kctx);
-
assert registerCachesFuture == null : "No caches
registration should be scheduled before new caches have started.";
registerCachesFuture =
cctx.affinity().onCacheChangeRequest(this, crd, exchActions);
- kctx.encryption().onDeActivate(kctx);
+ GridInternalSubscriptionProcessor isp =
kctx.internalSubscriptionProcessor();
-
((IgniteChangeGlobalStateSupport)kctx.distributedMetastorage()).onDeActivate(kctx);
+ for (IgniteChangeGlobalStateSupport lsnr :
isp.getGlobalStateListeners())
+ lsnr.onDeActivate(kctx);
if (log.isInfoEnabled()) {
log.info("Successfully deactivated data structures,
services and caches [" +
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index d30319e1985..1b55eedebd9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -75,6 +75,7 @@ import
org.apache.ignite.internal.processors.cluster.baseline.autoadjust.Baselin
import
org.apache.ignite.internal.processors.cluster.baseline.autoadjust.BaselineTopologyUpdater;
import
org.apache.ignite.internal.processors.configuration.distributed.DistributePropertyListener;
import org.apache.ignite.internal.processors.security.SecurityUtils;
+import
org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
@@ -1437,13 +1438,10 @@ public class GridClusterStateProcessor extends
GridProcessorAdapter implements I
boolean client = ctx.clientNode();
try {
- ctx.dataStructures().onActivate(ctx);
+ GridInternalSubscriptionProcessor isp =
ctx.internalSubscriptionProcessor();
- ctx.task().onActivate(ctx);
-
- ctx.encryption().onActivate(ctx);
-
- distributedBaselineConfiguration.onActivate();
+ for (IgniteChangeGlobalStateSupport lsnr :
isp.getGlobalStateListeners())
+ lsnr.onActivate(ctx);
if (log.isInfoEnabled())
log.info("Successfully performed final activation
steps [nodeId="
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 4b9114f587d..0428e7a77e8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -259,6 +259,8 @@ public final class DataStructuresProcessor extends
GridProcessorAdapter implemen
/** {@inheritDoc} */
@Override public void start() {
ctx.event().addLocalEventListener(lsnr, EVT_NODE_LEFT,
EVT_NODE_FAILED);
+
+ ctx.internalSubscriptionProcessor().registerGlobalStateListener(this);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
index 7cf1b8d5d62..afb5772609e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
@@ -279,6 +279,8 @@ public class DistributedMetaStorageImpl extends
GridProcessorAdapter
DistributedMetaStorageUpdateAckMessage.class,
this::onAckMessage
);
+
+ ctx.internalSubscriptionProcessor().registerGlobalStateListener(this);
}
/**
@@ -368,6 +370,7 @@ public class DistributedMetaStorageImpl extends
GridProcessorAdapter
/** {@inheritDoc} */
@Override public void onActivate(GridKernalContext kctx) {
+ // No-op.
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java
index ba1b33c5c5d..056792311a7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import
org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener;
import
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
+import
org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import
org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener;
import
org.apache.ignite.internal.processors.metastorage.DistributedMetastorageLifecycleListener;
import org.apache.ignite.internal.processors.query.schema.SchemaChangeListener;
@@ -53,7 +54,10 @@ public class GridInternalSubscriptionProcessor extends
GridProcessorAdapter {
* Listeners of distributed configuration controlled by
* {@link
org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor}.
*/
- private List<DistributedConfigurationLifecycleListener>
distributedConfigurationListeners = new ArrayList<>();
+ private final List<DistributedConfigurationLifecycleListener>
distributedConfigurationListeners = new ArrayList<>();
+
+ /** */
+ private final List<IgniteChangeGlobalStateSupport> globalStateListeners =
new ArrayList<>();
/**
* @param ctx Kernal context.
@@ -122,4 +126,14 @@ public class GridInternalSubscriptionProcessor extends
GridProcessorAdapter {
public List<DistributedConfigurationLifecycleListener>
getDistributedConfigurationListeners() {
return distributedConfigurationListeners;
}
+
+ /** */
+ public void registerGlobalStateListener(@NotNull
IgniteChangeGlobalStateSupport globalStateListener) {
+ globalStateListeners.add(globalStateListener);
+ }
+
+ /** */
+ public List<IgniteChangeGlobalStateSupport> getGlobalStateListeners() {
+ return globalStateListeners;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 099acc974ea..799cadfeb02 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -200,6 +200,8 @@ public class GridTaskProcessor extends GridProcessorAdapter
implements IgniteCha
ctx.io().addMessageListener(TOPIC_TASK_CANCEL, new
TaskCancelMessageListener());
ctx.io().addMessageListener(TOPIC_TASK, new JobMessageListener(true));
+ ctx.internalSubscriptionProcessor().registerGlobalStateListener(this);
+
if (log.isDebugEnabled())
log.debug("Started task processor.");
}
@@ -1169,7 +1171,7 @@ public class GridTaskProcessor extends
GridProcessorAdapter implements IgniteCha
/** {@inheritDoc} */
@Override public void onDeActivate(GridKernalContext kctx) {
- onKernalStop(true);
+ // No-op.
}
/**