This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 52bfe7b IGNITE-13366 Maintenance Mode core functionality: Java API to
request MM, Ignite node entering MM - Fixes #8325.
52bfe7b is described below
commit 52bfe7bda3507b1172134278c2f823bf3b58088b
Author: Sergey Chugunov <[email protected]>
AuthorDate: Thu Oct 15 09:35:11 2020 +0300
IGNITE-13366 Maintenance Mode core functionality: Java API to request MM,
Ignite node entering MM - Fixes #8325.
Signed-off-by: Sergey Chugunov <[email protected]>
---
.../persistence/MetaStorageCompatibilityTest.java | 12 +-
.../apache/ignite/internal/GridKernalContext.java | 9 +
.../ignite/internal/GridKernalContextImpl.java | 14 +
.../org/apache/ignite/internal/IgniteKernal.java | 22 +-
.../internal/maintenance/MaintenanceFileStore.java | 229 ++++++++++++++
.../internal/maintenance/MaintenanceProcessor.java | 276 +++++++++++++++++
.../managers/discovery/GridDiscoveryManager.java | 11 +-
.../pagemem/store/IgnitePageStoreManager.java | 5 -
.../processors/cache/ClusterCachesInfo.java | 12 +-
.../CleanCacheStoresMaintenanceAction.java | 75 +++++
.../CorruptedPdsMaintenanceCallback.java | 79 +++++
.../GridCacheDatabaseSharedManager.java | 22 ++
.../persistence/file/FilePageStoreManager.java | 85 +++--
.../wal/reader/StandaloneGridKernalContext.java | 6 +
.../ignite/maintenance/MaintenanceAction.java | 52 ++++
.../ignite/maintenance/MaintenanceRegistry.java | 157 ++++++++++
.../apache/ignite/maintenance/MaintenanceTask.java | 88 ++++++
.../maintenance/MaintenanceWorkflowCallback.java | 68 ++++
.../apache/ignite/maintenance/package-info.java | 21 ++
.../discovery/isolated/IsolatedDiscoverySpi.java | 256 +++++++++++++++
.../spi/discovery/isolated/IsolatedNode.java | 153 +++++++++
.../ignite/cache/RebalanceCancellationTest.java | 26 +-
.../cache/WalModeChangeAdvancedSelfTest.java | 192 +++++++++++-
...ocalWalModeChangeDuringRebalancingSelfTest.java | 244 ++++++++++++---
.../persistence/MaintenanceRegistrySimpleTest.java | 345 +++++++++++++++++++++
.../persistence/pagemem/NoOpPageStoreManager.java | 5 -
.../ignite/testsuites/IgnitePdsTestSuite2.java | 5 +
27 files changed, 2367 insertions(+), 102 deletions(-)
diff --git
a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/persistence/MetaStorageCompatibilityTest.java
b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/persistence/MetaStorageCompatibilityTest.java
index 803e41d..846864b 100644
---
a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/persistence/MetaStorageCompatibilityTest.java
+++
b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/persistence/MetaStorageCompatibilityTest.java
@@ -38,9 +38,13 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
+import static
org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+
/**
* Tests migration of metastorage.
*/
@@ -87,7 +91,7 @@ public class MetaStorageCompatibilityTest extends
IgnitePersistenceCompatibility
try (Ignite ig0 =
IgnitionEx.start(prepareConfig(getConfiguration(), CONSISTENT_ID_1))) {
try (Ignite ig1 =
IgnitionEx.start(prepareConfig(getConfiguration(), CONSISTENT_ID_2))) {
- assertTrue(GridTestUtils.waitForCondition(() ->
ig1.cluster().active(), 10_000));
+ assertTrue(GridTestUtils.waitForCondition(() ->
ig1.cluster().state() == ACTIVE, 10_000));
}
}
}
@@ -100,6 +104,7 @@ public class MetaStorageCompatibilityTest extends
IgnitePersistenceCompatibility
* Tests that BLT can be changed and persisted after metastorage migration.
*/
@Test
+ @WithSystemProperty(key = IGNITE_DISABLE_WAL_DURING_REBALANCING, value =
"false")
public void testMigrationToNewBaselineSetNewBaselineAfterMigration()
throws Exception {
try {
U.delete(new File(U.defaultWorkDirectory()));
@@ -119,7 +124,7 @@ public class MetaStorageCompatibilityTest extends
IgnitePersistenceCompatibility
try (Ignite ig0 =
IgnitionEx.start(prepareConfig(getConfiguration(), CONSISTENT_ID_1))) {
try (Ignite ig1 =
IgnitionEx.start(prepareConfig(getConfiguration(), CONSISTENT_ID_2))) {
- assertTrue(GridTestUtils.waitForCondition(() ->
ig1.cluster().active(), 10_000));
+ assertTrue(GridTestUtils.waitForCondition(() ->
ig1.cluster().state() == ACTIVE, 10_000));
}
}
}
@@ -132,6 +137,7 @@ public class MetaStorageCompatibilityTest extends
IgnitePersistenceCompatibility
*
*/
@Test
+ @WithSystemProperty(key = IGNITE_DISABLE_WAL_DURING_REBALANCING, value =
"false")
public void testMigrationWithExceptionDuringTheProcess() throws Exception {
try {
U.delete(new File(U.defaultWorkDirectory()));
@@ -162,7 +168,7 @@ public class MetaStorageCompatibilityTest extends
IgnitePersistenceCompatibility
try (Ignite ig0 =
IgnitionEx.start(prepareConfig(getConfiguration(), CONSISTENT_ID_1))) {
try (Ignite ig1 =
IgnitionEx.start(prepareConfig(getConfiguration(), CONSISTENT_ID_2))) {
- assertTrue(GridTestUtils.waitForCondition(() ->
ig1.cluster().active(), 10_000));
+ assertTrue(GridTestUtils.waitForCondition(() ->
ig1.cluster().state() == ACTIVE, 10_000));
}
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 1d65155..56f9765 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -21,6 +21,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
+
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManager;
@@ -78,6 +79,7 @@ import
org.apache.ignite.internal.util.IgniteExceptionRegistry;
import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.worker.WorkersRegistry;
+import org.apache.ignite.maintenance.MaintenanceRegistry;
import org.apache.ignite.plugin.PluginNotFoundException;
import org.apache.ignite.plugin.PluginProvider;
import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
@@ -201,6 +203,13 @@ public interface GridKernalContext extends
Iterable<GridComponent> {
public GridMetricManager metric();
/**
+ * Gets maintenance registry.
+ *
+ * @return Maintenance registry.
+ */
+ public MaintenanceRegistry maintenanceRegistry();
+
+ /**
* Gets system view manager.
*
* @return Monitoring manager.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index fd46dfa..ae589ad 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
+
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -38,6 +39,7 @@ import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.maintenance.MaintenanceProcessor;
import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManager;
import org.apache.ignite.internal.managers.collision.GridCollisionManager;
import org.apache.ignite.internal.managers.communication.GridIoManager;
@@ -104,6 +106,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.worker.WorkersRegistry;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.maintenance.MaintenanceRegistry;
import org.apache.ignite.plugin.PluginNotFoundException;
import org.apache.ignite.plugin.PluginProvider;
import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
@@ -322,6 +325,10 @@ public class GridKernalContextImpl implements
GridKernalContext, Externalizable
/** */
@GridToStringExclude
+ private MaintenanceProcessor maintenanceProc;
+
+ /** */
+ @GridToStringExclude
private List<GridComponent> comps = new LinkedList<>();
/** */
@@ -695,6 +702,8 @@ public class GridKernalContextImpl implements
GridKernalContext, Externalizable
diagnosticProcessor = (DiagnosticProcessor)comp;
else if (comp instanceof DurableBackgroundTasksProcessor)
durableBackgroundTasksProcessor =
(DurableBackgroundTasksProcessor)comp;
+ else if (comp instanceof MaintenanceProcessor)
+ maintenanceProc = (MaintenanceProcessor) comp;
else if (!(comp instanceof DiscoveryNodeValidationProcessor
|| comp instanceof PlatformPluginProcessor))
assert (comp instanceof GridPluginComponent) : "Unknown manager
class: " + comp.getClass();
@@ -788,6 +797,11 @@ public class GridKernalContextImpl implements
GridKernalContext, Externalizable
}
/** {@inheritDoc} */
+ @Override public MaintenanceRegistry maintenanceRegistry() {
+ return maintenanceProc;
+ }
+
+ /** {@inheritDoc} */
@Override public GridCacheProcessor cache() {
return cacheProc;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index aaeb8cc..50c1a55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -111,6 +111,7 @@ import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
import org.apache.ignite.internal.cluster.IgniteClusterEx;
+import org.apache.ignite.internal.maintenance.MaintenanceProcessor;
import org.apache.ignite.internal.managers.GridManager;
import org.apache.ignite.internal.managers.IgniteMBeansManager;
import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManager;
@@ -228,6 +229,7 @@ import org.apache.ignite.plugin.PluginProvider;
import org.apache.ignite.spi.IgniteSpi;
import org.apache.ignite.spi.IgniteSpiVersionCheckException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.isolated.IsolatedDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.spi.tracing.TracingConfigurationManager;
import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
@@ -1193,7 +1195,7 @@ public class IgniteKernal implements IgniteEx,
IgniteMXBean, Externalizable {
// Assign discovery manager to context before other processors
start so they
// are able to register custom event listener.
- final GridManager discoMgr = new GridDiscoveryManager(ctx);
+ GridManager discoMgr = new GridDiscoveryManager(ctx);
ctx.add(discoMgr, false);
@@ -1201,12 +1203,25 @@ public class IgniteKernal implements IgniteEx,
IgniteMXBean, Externalizable {
// able to register custom event listener.
startManager(new GridEncryptionManager(ctx));
+ startProcessor(new PdsConsistentIdProcessor(ctx));
+
+ MaintenanceProcessor mntcProcessor = new MaintenanceProcessor(ctx);
+
+ startProcessor(mntcProcessor);
+
+ if (mntcProcessor.isMaintenanceMode()) {
+ ctx.config().setDiscoverySpi(new IsolatedDiscoverySpi());
+
+ discoMgr = new GridDiscoveryManager(ctx);
+
+ ctx.add(discoMgr, false);
+ }
+
// Start processors before discovery manager, so they will
// be able to start receiving messages once discovery completes.
try {
startProcessor(COMPRESSION.createOptional(ctx));
startProcessor(new GridMarshallerMappingProcessor(ctx));
- startProcessor(new PdsConsistentIdProcessor(ctx));
startProcessor(new MvccProcessorImpl(ctx));
startProcessor(createComponent(DiscoveryNodeValidationProcessor.class, ctx));
startProcessor(new GridAffinityProcessor(ctx));
@@ -1280,6 +1295,9 @@ public class IgniteKernal implements IgniteEx,
IgniteMXBean, Externalizable {
throw e;
}
+ // All components exept Discovery are started, time to check if
maintenance is still needed
+ mntcProcessor.prepareAndExecuteMaintenance();
+
gw.writeLock();
try {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/maintenance/MaintenanceFileStore.java
b/modules/core/src/main/java/org/apache/ignite/internal/maintenance/MaintenanceFileStore.java
new file mode 100644
index 0000000..02698a1
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/maintenance/MaintenanceFileStore.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.maintenance;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
+import
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFoldersResolver;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.maintenance.MaintenanceTask;
+
+/**
+ * Provides API for durable storage of {@link MaintenanceTask}s and hides
implementation details from higher levels.
+ *
+ * Human-readable storage format is rigid but simple.
+ * <ol>
+ * <li>
+ * Maintenance file with tasks is stored in work directory of node
+ * under persistent store root defined by consistentId of node.
+ * </li>
+ * <li>
+ * Each task is written to disk as a {@link String} on a separate line.
+ * </li>
+ * <li>
+ * Task consists of two or three parts: task UUID, task description
and optional parameters.
+ * </li>
+ * </ol>
+ */
+public class MaintenanceFileStore {
+ /** */
+ private static final String MAINTENANCE_FILE_NAME =
"maintenance_tasks.mntc";
+
+ /** */
+ private static final String TASKS_SEPARATOR = System.lineSeparator();
+
+ /** */
+ private static final String TASK_PARTS_SEPARATOR = "\t";
+
+ /** Maintenance task consists of two or three parts: ID, description
(user-readable part)
+ * and optional task parameters. */
+ private static final int MAX_MNTC_TASK_PARTS_COUNT = 3;
+
+ /** */
+ private final boolean inMemoryMode;
+
+ /** */
+ private final PdsFoldersResolver pdsFoldersResolver;
+
+ /** */
+ private volatile File mntcTasksFile;
+
+ /** */
+ private volatile FileIO mntcTasksFileIO;
+
+ /** */
+ private final FileIOFactory ioFactory;
+
+ /** */
+ private final Map<String, MaintenanceTask> tasksInSync = new
ConcurrentHashMap<>();
+
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ public MaintenanceFileStore(boolean inMemoryMode,
+ PdsFoldersResolver pdsFoldersResolver,
+ FileIOFactory ioFactory,
+ IgniteLogger log) {
+ this.inMemoryMode = inMemoryMode;
+ this.pdsFoldersResolver = pdsFoldersResolver;
+ this.ioFactory = ioFactory;
+ this.log = log;
+ }
+
+ /** */
+ public void init() throws IgniteCheckedException, IOException {
+ if (inMemoryMode)
+ return;
+
+ PdsFolderSettings folderSettings = pdsFoldersResolver.resolveFolders();
+ File storeDir = new File(folderSettings.persistentStoreRootPath(),
folderSettings.folderName());
+ U.ensureDirectory(storeDir, "store directory for node persistent
data", log);
+
+ mntcTasksFile = new File(storeDir, MAINTENANCE_FILE_NAME);
+
+ if (!mntcTasksFile.exists())
+ mntcTasksFile.createNewFile();
+
+ mntcTasksFileIO = ioFactory.create(mntcTasksFile);
+
+ readTasksFromFile();
+ }
+
+ /**
+ * Deletes file with maintenance tasks.
+ */
+ public void clear() {
+ if (mntcTasksFile != null)
+ mntcTasksFile.delete();
+ }
+
+ /**
+ * Stops
+ */
+ public void stop() throws IOException {
+ if (inMemoryMode)
+ return;
+
+ if (mntcTasksFileIO != null)
+ mntcTasksFileIO.close();
+ }
+
+ /** */
+ private void readTasksFromFile() throws IOException {
+ int len = (int) mntcTasksFileIO.size();
+
+ if (len == 0)
+ return;
+
+ byte[] allBytes = new byte[len];
+
+ mntcTasksFileIO.read(allBytes, 0, len);
+
+ String[] allTasks = new String(allBytes).split(TASKS_SEPARATOR);
+
+ for (String taskStr : allTasks) {
+ String[] subStrs = taskStr.split(TASK_PARTS_SEPARATOR);
+
+ int partsNum = subStrs.length;
+
+ if (partsNum < MAX_MNTC_TASK_PARTS_COUNT - 1) {
+ log.info("Corrupted maintenance task found and will be
skipped, " +
+ "mandatory parts are missing: " + taskStr);
+
+ continue;
+ }
+
+ if (partsNum > MAX_MNTC_TASK_PARTS_COUNT) {
+ log.info("Corrupted maintenance task found and will be
skipped, " +
+ "too many parts in task: " + taskStr);
+
+ continue;
+ }
+
+ String name = subStrs[0];
+
+ MaintenanceTask task = new MaintenanceTask(name, subStrs[1],
partsNum == 3 ? subStrs[2] : null);
+
+ tasksInSync.put(name, task);
+ }
+ }
+
+ /** */
+ private void writeTasksToFile() throws IOException {
+ mntcTasksFileIO.clear();
+
+ String allTasks = tasksInSync.values().stream()
+ .map(
+ task -> task.name() +
+ TASK_PARTS_SEPARATOR +
+ task.description() +
+ TASK_PARTS_SEPARATOR +
+ (task.parameters() != null ? task.parameters() : "")
+ )
+ .collect(Collectors.joining(System.lineSeparator()));
+
+ byte[] allTasksBytes = allTasks.getBytes();
+
+ int left = allTasksBytes.length;
+ int len = allTasksBytes.length;
+
+ while ((left -= mntcTasksFileIO.writeFully(allTasksBytes, len - left,
left)) > 0)
+ ;
+
+ mntcTasksFileIO.force();
+ }
+
+ /** */
+ public Map<String, MaintenanceTask> getAllTasks() {
+ if (inMemoryMode)
+ return null;
+
+ return Collections.unmodifiableMap(tasksInSync);
+ }
+
+ /** */
+ public void writeMaintenanceTask(MaintenanceTask task) throws IOException {
+ if (inMemoryMode)
+ return;
+
+ tasksInSync.put(task.name(), task);
+
+ writeTasksToFile();
+ }
+
+ /** */
+ public void deleteMaintenanceTask(String taskName) throws IOException {
+ if (inMemoryMode)
+ return;
+
+ tasksInSync.remove(taskName);
+
+ writeTasksToFile();
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/maintenance/MaintenanceProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/maintenance/MaintenanceProcessor.java
new file mode 100644
index 0000000..8f85ceb
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/maintenance/MaintenanceProcessor.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.maintenance;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.maintenance.MaintenanceAction;
+import org.apache.ignite.maintenance.MaintenanceRegistry;
+import org.apache.ignite.maintenance.MaintenanceTask;
+import org.apache.ignite.maintenance.MaintenanceWorkflowCallback;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public class MaintenanceProcessor extends GridProcessorAdapter implements
MaintenanceRegistry {
+ /** */
+ private static final String IN_MEMORY_MODE_ERR_MSG = "Maintenance Mode is
not supported for in-memory clusters";
+
+ /**
+ * Active {@link MaintenanceTask}s are the ones that were read from disk
when node entered Maintenance Mode.
+ */
+ private final Map<String, MaintenanceTask> activeTasks = new
ConcurrentHashMap<>();
+
+ /**
+ * Requested {@link MaintenanceTask}s are collection of tasks requested by
user
+ * or other components when node operates normally (not in Maintenance
Mode).
+ */
+ private final Map<String, MaintenanceTask> requestedTasks = new
ConcurrentHashMap<>();
+
+ /** */
+ private final Map<String, MaintenanceWorkflowCallback> workflowCallbacks =
new ConcurrentHashMap<>();
+
+ /** */
+ private final MaintenanceFileStore fileStorage;
+
+ /** */
+ private final boolean inMemoryMode;
+
+ /**
+ * @param ctx Kernal context.
+ */
+ public MaintenanceProcessor(GridKernalContext ctx) {
+ super(ctx);
+
+ inMemoryMode = !CU.isPersistenceEnabled(ctx.config());
+
+ if (inMemoryMode) {
+ fileStorage = new MaintenanceFileStore(true,
+ null,
+ null,
+ null);
+
+ return;
+ }
+
+ fileStorage = new MaintenanceFileStore(false,
+ ctx.pdsFolderResolver(),
+ ctx.config().getDataStorageConfiguration().getFileIOFactory(),
+ log);
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable MaintenanceTask
registerMaintenanceTask(MaintenanceTask task) throws IgniteCheckedException {
+ if (inMemoryMode)
+ throw new IgniteCheckedException(IN_MEMORY_MODE_ERR_MSG);
+
+ if (isMaintenanceMode())
+ throw new IgniteCheckedException("Node is already in Maintenance
Mode, " +
+ "registering additional maintenance task is not allowed in
Maintenance Mode.");
+
+ MaintenanceTask oldTask = requestedTasks.put(task.name(), task);
+
+ if (oldTask != null) {
+ log.info(
+ "Maintenance Task with name " + task.name() +
+ " is already registered" +
+ oldTask.parameters() != null ? " with parameters " +
oldTask.parameters() : "" + "." +
+ " It will be replaced with new task" +
+ task.parameters() != null ? " with parameters " +
task.parameters() : "" + "."
+ );
+ }
+
+ try {
+ fileStorage.writeMaintenanceTask(task);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException("Failed to register maintenance
task " + task, e);
+ }
+
+ return oldTask;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) throws IgniteCheckedException {
+ try {
+ fileStorage.stop();
+ }
+ catch (IOException e) {
+ log.warning("Failed to free maintenance file resources", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteCheckedException {
+ if (inMemoryMode)
+ return;
+
+ try {
+ fileStorage.init();
+
+ activeTasks.putAll(fileStorage.getAllTasks());
+ }
+ catch (Throwable t) {
+ log.warning("Caught exception when starting MaintenanceProcessor,"
+
+ " maintenance mode won't be entered", t);
+
+ activeTasks.clear();
+
+ fileStorage.clear();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareAndExecuteMaintenance() {
+ if (isMaintenanceMode()) {
+ workflowCallbacks.entrySet().removeIf(cbE ->
+ {
+ if (!cbE.getValue().shouldProceedWithMaintenance()) {
+ unregisterMaintenanceTask(cbE.getKey());
+
+ return true;
+ }
+
+ return false;
+ }
+ );
+ }
+
+ if (!workflowCallbacks.isEmpty())
+ proceedWithMaintenance();
+ else {
+ if (log.isInfoEnabled())
+ log.info("All maintenance tasks are fixed, no need to enter
maintenance mode. " +
+ "Restart the node to get it back to normal operations.");
+ }
+ }
+
+ /**
+ * Handles all {@link MaintenanceTask maintenance tasks} left
+ * after {@link MaintenanceRegistry#prepareAndExecuteMaintenance()} check.
+ *
+ * If a task defines an action that should be started automatically (e.g.
defragmentation starts automatically,
+ * no additional confirmation from user is required), it is executed.
+ *
+ * Otherwise waits for user to trigger actions for maintenance tasks.
+ */
+ private void proceedWithMaintenance() {
+ for (Map.Entry<String, MaintenanceWorkflowCallback> cbE :
workflowCallbacks.entrySet()) {
+ MaintenanceAction mntcAction = cbE.getValue().automaticAction();
+
+ if (mntcAction != null) {
+ try {
+ mntcAction.execute();
+ }
+ catch (Throwable t) {
+ log.warning("Failed to execute automatic action for
maintenance task: " +
+ activeTasks.get(cbE.getKey()), t);
+
+ throw t;
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable MaintenanceTask activeMaintenanceTask(String
maitenanceTaskName) {
+ return activeTasks.get(maitenanceTaskName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isMaintenanceMode() {
+ return !activeTasks.isEmpty();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unregisterMaintenanceTask(String
maintenanceTaskName) {
+ if (inMemoryMode)
+ return;
+
+ if (isMaintenanceMode())
+ activeTasks.remove(maintenanceTaskName);
+ else
+ requestedTasks.remove(maintenanceTaskName);
+
+ try {
+ fileStorage.deleteMaintenanceTask(maintenanceTaskName);
+ }
+ catch (IOException e) {
+ log.warning("Failed to clear maintenance task with name "
+ + maintenanceTaskName
+ + " from file, whole file will be deleted", e
+ );
+
+ fileStorage.clear();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void registerWorkflowCallback(@NotNull String
maintenanceTaskName, @NotNull MaintenanceWorkflowCallback cb) {
+ if (inMemoryMode)
+ throw new IgniteException(IN_MEMORY_MODE_ERR_MSG);
+
+ List<MaintenanceAction> actions = cb.allActions();
+
+ if (actions == null || actions.isEmpty())
+ throw new IgniteException("Maintenance workflow callback should
provide at least one mainetance action");
+
+ int size = actions.size();
+ long distinctSize = actions.stream().map(a ->
a.name()).distinct().count();
+
+ if (distinctSize < size)
+ throw new IgniteException("All actions of a single workflow should
have unique names: " +
+ actions.stream().map(a ->
a.name()).collect(Collectors.joining(", ")));
+
+ Optional<String> wrongActionName = actions
+ .stream()
+ .map(MaintenanceAction::name)
+ .filter(name -> !U.alphanumericUnderscore(name))
+ .findFirst();
+
+ if (wrongActionName.isPresent())
+ throw new IgniteException(
+ "All actions' names should contain only alphanumeric and
underscore symbols: "
+ + wrongActionName.get());
+
+ workflowCallbacks.put(maintenanceTaskName, cb);
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<MaintenanceAction> actionsForMaintenanceTask(String
maintenanceTaskName) {
+ if (inMemoryMode)
+ throw new IgniteException(IN_MEMORY_MODE_ERR_MSG);
+
+ if (!activeTasks.containsKey(maintenanceTaskName))
+ throw new IgniteException("Maintenance workflow callback for given
task name not found, " +
+ "cannot retrieve maintenance actions for it: " +
maintenanceTaskName);
+
+ return workflowCallbacks.get(maintenanceTaskName).allActions();
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index e791564..0a8c857 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -37,6 +37,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
@@ -292,10 +293,12 @@ public class GridDiscoveryManager extends
GridManagerAdapter<DiscoverySpi> {
public GridDiscoveryManager(GridKernalContext ctx) {
super(ctx, ctx.config().getDiscoverySpi());
- ctx.systemView().registerView(NODES_SYS_VIEW, NODES_SYS_VIEW_DESC,
- new ClusterNodeViewWalker(),
- () -> F.concat(false, allNodes(), daemonNodes()),
- ClusterNodeView::new);
+ if (ctx.systemView().view(NODES_SYS_VIEW) == null) {
+ ctx.systemView().registerView(NODES_SYS_VIEW, NODES_SYS_VIEW_DESC,
+ new ClusterNodeViewWalker(),
+ () -> F.concat(false, allNodes(), daemonNodes()),
+ ClusterNodeView::new);
+ }
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
index 243d99d..8216b96 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
@@ -224,11 +224,6 @@ public interface IgnitePageStoreManager extends
GridCacheSharedManager, IgniteCh
public boolean hasIndexStore(int grpId);
/**
- * @param grpDesc Cache group descriptor.
- */
- public void beforeCacheGroupStart(CacheGroupDescriptor grpDesc);
-
- /**
* Calculates number of pages currently allocated for given cache group.
*
* @param grpId cache group id.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index f773f4d..88a9fde 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -2222,6 +2222,13 @@ public class ClusterCachesInfo {
Map<String, Integer> caches =
Collections.singletonMap(startedCacheCfg.getName(), cacheId);
boolean persistent = resolvePersistentFlag(exchActions,
startedCacheCfg);
+ boolean walGloballyEnabled = false;
+
+ // client nodes cannot read wal enabled/disabled status so they should
use default one
+ if (ctx.clientNode())
+ walGloballyEnabled = persistent;
+ else if (persistent)
+ walGloballyEnabled =
ctx.cache().context().database().walEnabled(grpId, false);
CacheGroupDescriptor grpDesc = new CacheGroupDescriptor(
startedCacheCfg,
@@ -2232,7 +2239,7 @@ public class ClusterCachesInfo {
deploymentId,
caches,
persistent,
- persistent,
+ walGloballyEnabled,
null,
cacheCfgEnrichment
);
@@ -2240,9 +2247,6 @@ public class ClusterCachesInfo {
if (startedCacheCfg.isEncryptionEnabled())
ctx.encryption().beforeCacheGroupStart(grpId, encKey);
- if (ctx.cache().context().pageStore() != null)
- ctx.cache().context().pageStore().beforeCacheGroupStart(grpDesc);
-
CacheGroupDescriptor old = registeredCacheGrps.put(grpId, grpDesc);
assert old == null : old;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CleanCacheStoresMaintenanceAction.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CleanCacheStoresMaintenanceAction.java
new file mode 100644
index 0000000..de94e6e
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CleanCacheStoresMaintenanceAction.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import java.io.File;
+
+import org.apache.ignite.maintenance.MaintenanceAction;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DATA_FILENAME;
+
+/**
+ *
+ */
+public class CleanCacheStoresMaintenanceAction implements
MaintenanceAction<Void> {
+ /** */
+ public static final String ACTION_NAME = "clean_data_files";
+
+ /** */
+ private final File rootStoreDir;
+
+ /** */
+ private final String[] cacheStoreDirs;
+
+ /**
+ * @param rootStoreDir
+ * @param cacheStoreDirs
+ */
+ public CleanCacheStoresMaintenanceAction(File rootStoreDir, String[]
cacheStoreDirs) {
+ this.rootStoreDir = rootStoreDir;
+ this.cacheStoreDirs = cacheStoreDirs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void execute() {
+ for (String cacheStoreDirName : cacheStoreDirs) {
+ File cacheStoreDir = new File(rootStoreDir, cacheStoreDirName);
+
+ if (cacheStoreDir.exists() && cacheStoreDir.isDirectory()) {
+ for (File file : cacheStoreDir.listFiles()) {
+ if (!file.getName().equals(CACHE_DATA_FILENAME))
+ file.delete();
+ }
+ }
+ }
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull String name() {
+ return ACTION_NAME;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable String description() {
+ return "Cleans data files of cache groups";
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CorruptedPdsMaintenanceCallback.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CorruptedPdsMaintenanceCallback.java
new file mode 100644
index 0000000..52a8f6f
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CorruptedPdsMaintenanceCallback.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.ignite.maintenance.MaintenanceAction;
+import org.apache.ignite.maintenance.MaintenanceWorkflowCallback;
+import org.jetbrains.annotations.NotNull;
+
+import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DATA_FILENAME;
+
+/**
+ *
+ */
+public class CorruptedPdsMaintenanceCallback implements
MaintenanceWorkflowCallback {
+ /** */
+ private final File workDir;
+
+ /** */
+ private final List<String> cacheStoreDirs;
+
+ /**
+ * @param workDir
+ * @param cacheStoreDirs
+ */
+ public CorruptedPdsMaintenanceCallback(@NotNull File workDir,
+ @NotNull List<String>
cacheStoreDirs)
+ {
+ this.workDir = workDir;
+ this.cacheStoreDirs = cacheStoreDirs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean shouldProceedWithMaintenance() {
+ for (String cacheStoreDirName : cacheStoreDirs) {
+ File cacheStoreDir = new File(workDir, cacheStoreDirName);
+
+ if (cacheStoreDir.exists()
+ && cacheStoreDir.isDirectory()
+ && cacheStoreDir.listFiles().length > 0
+ ) {
+ for (File f : cacheStoreDir.listFiles()) {
+ if (!f.getName().equals(CACHE_DATA_FILENAME))
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<MaintenanceAction> allActions() {
+ return Arrays.asList(new CleanCacheStoresMaintenanceAction(workDir,
cacheStoreDirs.toArray(new String[0])));
+ }
+
+ /** {@inheritDoc} */
+ @Override public MaintenanceAction automaticAction() {
+ return null;
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 2214c62..50224f3 100755
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -29,6 +29,7 @@ import java.nio.channels.OverlappingFileLockException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -50,6 +51,7 @@ import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
+
import org.apache.ignite.DataRegionMetricsProvider;
import org.apache.ignite.DataStorageMetrics;
import org.apache.ignite.IgniteCheckedException;
@@ -147,6 +149,8 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.maintenance.MaintenanceRegistry;
+import org.apache.ignite.maintenance.MaintenanceTask;
import org.apache.ignite.mxbean.DataStorageMetricsMXBean;
import org.apache.ignite.spi.systemview.view.MetastorageView;
import org.apache.ignite.transactions.TransactionState;
@@ -170,6 +174,7 @@ import static
org.apache.ignite.internal.processors.cache.distributed.dht.topolo
import static
org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
import static
org.apache.ignite.internal.processors.cache.persistence.CheckpointState.LOCK_RELEASED;
import static
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointReadWriteLock.CHECKPOINT_LOCK_HOLD_COUNT;
+import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CORRUPTED_DATA_FILES_MNTC_TASK_NAME;
import static org.apache.ignite.internal.util.IgniteUtils.checkpointBufferSize;
/**
@@ -1574,6 +1579,23 @@ public class GridCacheDatabaseSharedManager extends
IgniteCacheDatabaseSharedMan
if (kctx.clientNode())
return;
+ MaintenanceRegistry mntcRegistry = kctx.maintenanceRegistry();
+
+ MaintenanceTask mntcTask =
mntcRegistry.activeMaintenanceTask(CORRUPTED_DATA_FILES_MNTC_TASK_NAME);
+
+ if (mntcTask != null) {
+ log.warning("Maintenance task found, stop restoring memory");
+
+ File workDir = ((FilePageStoreManager) cctx.pageStore()).workDir();
+
+
mntcRegistry.registerWorkflowCallback(CORRUPTED_DATA_FILES_MNTC_TASK_NAME,
+ new CorruptedPdsMaintenanceCallback(workDir,
+ Arrays.asList(mntcTask.parameters().split(File.separator)))
+ );
+
+ return;
+ }
+
checkpointReadLock();
try {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index 7f6269d..024f52c 100755
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -33,6 +33,7 @@ import java.nio.file.Path;
import java.nio.file.PathMatcher;
import java.nio.file.StandardCopyOption;
import java.util.AbstractList;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -50,6 +51,7 @@ import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -80,12 +82,12 @@ import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCa
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
import org.apache.ignite.internal.util.GridStripedReadWriteLock;
-import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.maintenance.MaintenanceTask;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.thread.IgniteThread;
@@ -143,6 +145,9 @@ public class FilePageStoreManager extends
GridCacheSharedManagerAdapter implemen
public static final PathMatcher TMP_FILE_MATCHER =
FileSystems.getDefault().getPathMatcher("glob:**" + TMP_SUFFIX);
+ /** Unique name for corrupted data files maintenance task. */
+ public static final String CORRUPTED_DATA_FILES_MNTC_TASK_NAME =
"corrupted-cache-data-files-task";
+
/** Listeners of configuration changes e.g. overwrite or remove actions. */
private final List<BiConsumer<String, File>> lsnrs = new
CopyOnWriteArrayList<>();
@@ -358,6 +363,31 @@ public class FilePageStoreManager extends
GridCacheSharedManagerAdapter implemen
/** {@inheritDoc} */
@Override public void beginRecover() {
+ List<String> groupsWithWalDisabled = checkCachesWithDisabledWal();
+
+ if (!groupsWithWalDisabled.isEmpty()) {
+ String errorMsg = "Cache groups with potentially corrupted
partition files found. " +
+ "To cleanup them maintenance is needed, node will enter
maintenance mode on next restart. " +
+ "Cleanup cache group folders manually or trigger maintenance
action to do that and restart the node. " +
+ "Corrupted files are located in subdirectories " +
groupsWithWalDisabled +
+ " in a work dir " + storeWorkDir;
+
+ log.warning(errorMsg);
+
+ try {
+ cctx.kernalContext().maintenanceRegistry()
+ .registerMaintenanceTask(
+ new
MaintenanceTask(CORRUPTED_DATA_FILES_MNTC_TASK_NAME,
+ "Corrupted cache groups found",
+
groupsWithWalDisabled.stream().collect(Collectors.joining(File.separator)))
+ );
+ } catch (IgniteCheckedException e) {
+ log.warning("Failed to register maintenance record for
corrupted partition files.", e);
+ }
+
+ throw new IgniteException(errorMsg);
+ }
+
for (CacheStoreHolder holder : idxCacheStores.values()) {
holder.idxStore.beginRecover();
@@ -366,6 +396,38 @@ public class FilePageStoreManager extends
GridCacheSharedManagerAdapter implemen
}
}
+ /**
+ * Checks cache groups' settings and returns groups names with disabled
WAL.
+ *
+ * @return List of cache groups names that had WAL disabled before node
stop.
+ */
+ private List<String> checkCachesWithDisabledWal() {
+ List<String> corruptedCachesDirs = new ArrayList<>();
+
+ for (Integer grpDescId : idxCacheStores.keySet()) {
+ CacheGroupDescriptor desc =
cctx.cache().cacheGroupDescriptor(grpDescId);
+
+ if (desc != null && desc.persistenceEnabled()) {
+ boolean localEnabled = cctx.database().walEnabled(grpDescId,
true);
+ boolean globalEnabled = cctx.database().walEnabled(grpDescId,
false);
+
+ if (!localEnabled || !globalEnabled) {
+ File dir = cacheWorkDir(desc.config());
+
+ if (Arrays.stream(
+ dir.listFiles())
+ .filter(f -> !f.getName().equals(CACHE_DATA_FILENAME))
+ .count() > 0)
+ {
+ corruptedCachesDirs.add(cacheDirName(desc.config()));
+ }
+ }
+ }
+ }
+
+ return corruptedCachesDirs;
+ }
+
/** {@inheritDoc} */
@Override public void finishRecover() throws IgniteCheckedException {
try {
@@ -1313,27 +1375,6 @@ public class FilePageStoreManager extends
GridCacheSharedManagerAdapter implemen
return store;
}
- /** {@inheritDoc} */
- @Override public void beforeCacheGroupStart(CacheGroupDescriptor grpDesc) {
- if (grpDesc.persistenceEnabled()) {
- boolean localEnabled =
cctx.database().walEnabled(grpDesc.groupId(), true);
- boolean globalEnabled =
cctx.database().walEnabled(grpDesc.groupId(), false);
-
- if (!localEnabled || !globalEnabled) {
- File dir = cacheWorkDir(grpDesc.config());
-
- assert dir.exists();
-
- boolean res = IgniteUtils.delete(dir);
-
- assert res;
-
- if (!globalEnabled)
- grpDesc.walEnabled(false);
- }
- }
- }
-
/**
* @param pageStoreFileIoFactory File IO factory to override default, may
be used for blocked read-write.
* @param pageStoreV1FileIoFactory File IO factory for reading V1 page
store and for fast touching page files
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index e1cded2..d5a9ed0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -96,6 +96,7 @@ import
org.apache.ignite.internal.util.IgniteExceptionRegistry;
import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.worker.WorkersRegistry;
+import org.apache.ignite.maintenance.MaintenanceRegistry;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.PluginNotFoundException;
import org.apache.ignite.plugin.PluginProvider;
@@ -365,6 +366,11 @@ public class StandaloneGridKernalContext implements
GridKernalContext {
}
/** {@inheritDoc} */
+ @Override public MaintenanceRegistry maintenanceRegistry() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteRestProcessor rest() {
return null;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/maintenance/MaintenanceAction.java
b/modules/core/src/main/java/org/apache/ignite/maintenance/MaintenanceAction.java
new file mode 100644
index 0000000..381c660
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/maintenance/MaintenanceAction.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.maintenance;
+
+import org.apache.ignite.lang.IgniteExperimental;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Maintenance action interface enables user to execute operations related to
a particular {@link MaintenanceTask}.
+ *
+ * These operations may resolve maintenance situation (e.g. remove corrupted
data files), get information
+ * about other ongoing maintenance action (e.g. if some action requires a lot
of time and user wants to know
+ * current progress of the action) or cancel other ongoing action.
+ *
+ * List of maintenance actions available for each task is defined by {@link
MaintenanceWorkflowCallback}.
+ *
+ * {@link MaintenanceRegistry} provides an access to maintenance actions for a
{@link MaintenanceTask} with
+ * call {@link MaintenanceRegistry#actionsForMaintenanceTask(String)}
+ *
+ */
+@IgniteExperimental
+public interface MaintenanceAction<T> {
+ /** Executes operations of current maintenance action and returns results.
*/
+ public T execute();
+
+ /**
+ * Mandatory human-readable name of maintenance action.
+ * All actions of single {@link MaintenanceWorkflowCallback} should have
unique names.
+ */
+ @NotNull public String name();
+
+ /**
+ * Optional user-readable description of maintenance action.
+ */
+ @Nullable public String description();
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/maintenance/MaintenanceRegistry.java
b/modules/core/src/main/java/org/apache/ignite/maintenance/MaintenanceRegistry.java
new file mode 100644
index 0000000..b264700
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/maintenance/MaintenanceRegistry.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.maintenance;
+
+import java.util.List;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.lang.IgniteExperimental;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link MaintenanceRegistry} is a service local to each Ignite node
+ * that allows to request performing maintenance actions on that particular
node.
+ *
+ * <p>
+ * When a node gets into a situation when some specific actions are
required
+ * it enters the special mode called maintenance mode.
+ * In maintenance mode it doesn't join to the rest of the cluster but
still allows to connect to it
+ * with control.{sh|bat} script or via JXM interface and perform needed
actions.
+ * </p>
+ *
+ * <p>
+ * Implementing new task for maintenance mode requires several pieces of
code.
+ *
+ * <ul>
+ * <li>
+ * First, component requiring Maintenance Mode should be able to
register new {@link MaintenanceTask}
+ * with {@link
MaintenanceRegistry#registerMaintenanceTask(MaintenanceTask)} method.
+ *
+ * Registration could happen automatically (e.g. if component
detects some emergency situation
+ * that requires user intervention)
+ * or by user request (e.g. for a planned maintenance that requires
+ * detaching node from the rest of the cluster).
+ * </li>
+ * <li>
+ * Component responsible for handling this {@link MaintenanceTask}
+ * on startup checks if the task is registered (thus it should go
to Maintenance Mode).
+ * If task is found component provides to {@link
MaintenanceRegistry} its own implementation
+ * of {@link MaintenanceWorkflowCallback} interface
+ * via method {@link
MaintenanceRegistry#registerWorkflowCallback(String,
MaintenanceWorkflowCallback)}.
+ * </li>
+ * <li>
+ * {@link MaintenanceWorkflowCallback} should provide {@link
MaintenanceRegistry} with
+ * {@link MaintenanceAction}s that are able to resolve maintenance
task,
+ * get information about it and so on.
+ * Logic of these actions is completely up to the component
providing it
+ * and depends only on particular maintenance task.
+ * </li>
+ * <li>
+ * When maintenance task is fixed, it should be removed from
{@link MaintenanceRegistry}
+ * with call {@link
MaintenanceRegistry#unregisterMaintenanceTask(String)}.
+ * </li>
+ * </ul>
+ * </p>
+ */
+@IgniteExperimental
+public interface MaintenanceRegistry {
+ /**
+ * @return {@code True} if any maintenance task was found.
+ */
+ public boolean isMaintenanceMode();
+
+ /**
+ * Method to register {@link MaintenanceTask} locally on the node where
method is called.
+ * <p>
+ * For now it is not allowed to register new Maintenance Tasks in
Maintenance Mode
+ * so this method should be called only when node operates normally.
+ * This may change in the future so it will become possible to create
other maintenance tasks
+ * on node that is already entered Maintenance Mode.
+ * </p>
+ * <p>
+ * When task is registered node continues to operate normally
+ * and will enter Maintenance Mode only after restart.
+ * </p>
+ *
+ * @param task {@link MaintenanceTask} object with maintenance information
that needs
+ * to be stored to maintenance
registry.
+ *
+ * @throws IgniteCheckedException If handling or storing maintenance task
failed.
+ *
+ * @return Previously registered {@link MaintenanceTask} with the same ID
+ * or null if no tasks were registered for this ID.
+ */
+ public @Nullable MaintenanceTask registerMaintenanceTask(MaintenanceTask
task) throws IgniteCheckedException;
+
+ /**
+ * Deletes {@link MaintenanceTask} of given ID from maintenance registry.
+ *
+ * @param maintenanceTaskName name of {@link MaintenanceTask} to be
deleted.
+ */
+ public void unregisterMaintenanceTask(String maintenanceTaskName);
+
+ /**
+ * Returns active {@link MaintenanceTask} by its name.
+ * There are active tasks only when node entered Maintenance Mode.
+ *
+ * {@link MaintenanceTask} becomes active when node enters Maintenance
Mode and doesn't resolve the task
+ * during maintenance prepare phase.
+ *
+ * @param maintenanceTaskName Maintenance Task name.
+ *
+ * @return {@link MaintenanceTask} object for given name or null if no
maintenance task was found.
+ */
+ @Nullable public MaintenanceTask activeMaintenanceTask(String
maintenanceTaskName);
+
+ /**
+ * Registers {@link MaintenanceWorkflowCallback} for a {@link
MaintenanceTask} with a given name.
+ *
+ * Component registered {@link MaintenanceTask} automatically or by user
request
+ * is responsible for providing {@link MaintenanceRegistry} with an
implementation of
+ * {@link MaintenanceWorkflowCallback} where registry obtains {@link
MaintenanceAction}s
+ * to be executed for this task and does a preliminary check before
starting maintenance.
+ *
+ * @param maintenanceTaskName name of {@link MaintenanceTask} this
callback is registered for.
+ * @param cb {@link MaintenanceWorkflowCallback} interface used by
MaintenanceRegistry to execute
+ * maintenance steps by
workflow.
+ */
+ public void registerWorkflowCallback(@NotNull String maintenanceTaskName,
@NotNull MaintenanceWorkflowCallback cb);
+
+ /**
+ * All {@link MaintenanceAction}s provided by a component for {@link
MaintenanceTask} with a given name.
+ *
+ * @param maintenanceTaskName name of Maintenance Task.
+ * @return {@link List} of all available {@link MaintenanceAction}s for
given Maintenance Task.
+ *
+ * @throws IgniteException if no Maintenance Tasks are registered for
provided name.
+ */
+ public List<MaintenanceAction> actionsForMaintenanceTask(String
maintenanceTaskName);
+
+ /**
+ * Examine all components if they need to execute maintenance actions.
+ *
+ * As user may resolve some maintenance situations by hand when the node
was turned off,
+ * component may find out that no maintenance is needed anymore.
+ *
+ * {@link MaintenanceTask Maintenance tasks} for these components are
removed
+ * and their {@link MaintenanceAction maintenance actions} are not
executed.
+ */
+ public void prepareAndExecuteMaintenance();
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/maintenance/MaintenanceTask.java
b/modules/core/src/main/java/org/apache/ignite/maintenance/MaintenanceTask.java
new file mode 100644
index 0000000..49795f1
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/maintenance/MaintenanceTask.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.maintenance;
+
+import org.apache.ignite.lang.IgniteExperimental;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents request to handle maintenance situation.
+ *
+ * It can be created automatically or by user request by any component needed
maintenance and should be registered
+ * in Maintenance Registry with the method {@link
MaintenanceRegistry#registerMaintenanceTask(MaintenanceTask)}.
+ *
+ * Lifecycle of Maintenance Task is managed by {@link MaintenanceRegistry}.
+ *
+ * Task contains unique ID of maintenance situation (e.g. situation of PDS
corruption or defragmentation),
+ * description of task and optional parameters.
+ *
+ * When task is created node should be restarted to enter maintenance mode.
+ * In that mode node can start actions needed to resolve maintenance situation
or wait for user to trigger them.
+ *
+ * Components that may need to perform maintenance actions as part of their
recovery workflow should check
+ * maintenance status on startup and supply {@link
MaintenanceWorkflowCallback} implementation to
+ * {@link MaintenanceRegistry#registerWorkflowCallback(String,
MaintenanceWorkflowCallback)} to allow Maintenance Registry
+ * to find maintenance actions and start them automatically or by user request.
+ *
+ * Matching between {@link MaintenanceTask} and {@link
MaintenanceWorkflowCallback} is performed based on
+ * the name of maintenance task that should be unique among all registered
tasks.
+ */
+@IgniteExperimental
+public class MaintenanceTask {
+ /** */
+ private final String name;
+
+ /** */
+ private final String description;
+
+ /** */
+ private final String params;
+
+ /**
+ * @param name Mandatory name of maintenance task. Name should be unique
among all other tasks.
+ * @param description Mandatory description of maintenance situation.
+ * @param params Optional parameters that may be needed to perform
maintenance actions.
+ */
+ public MaintenanceTask(@NotNull String name, @NotNull String description,
@Nullable String params) {
+ this.name = name;
+ this.description = description;
+ this.params = params;
+ }
+
+ /**
+ * @return Name of Maintenance Task unique among all registered tasks.
+ */
+ public @NotNull String name() {
+ return name;
+ }
+
+ /**
+ * @return Human-readable not-nullable description of the task.
+ */
+ public @NotNull String description() {
+ return description;
+ }
+
+ /**
+ * @return Optional parameters that could be used by actions associated
with this Maintenance Task.
+ */
+ public @Nullable String parameters() {
+ return params;
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/maintenance/MaintenanceWorkflowCallback.java
b/modules/core/src/main/java/org/apache/ignite/maintenance/MaintenanceWorkflowCallback.java
new file mode 100644
index 0000000..26ba2a1
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/maintenance/MaintenanceWorkflowCallback.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.maintenance;
+
+import java.util.List;
+
+import org.apache.ignite.lang.IgniteExperimental;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstraction to decouple interaction between {@link MaintenanceRegistry}
+ * and components that may require maintenance.
+ *
+ * If a component may cause node to enter maintenance mode, it should register
this callback
+ * in {@link MaintenanceRegistry} using method {@link
MaintenanceRegistry#registerWorkflowCallback(String,
MaintenanceWorkflowCallback)}
+ *
+ * {@link MaintenanceRegistry} during its workflow will collect necessary
information about maintenance for components
+ * without knowing implementation details of the components.
+ */
+@IgniteExperimental
+public interface MaintenanceWorkflowCallback {
+ /**
+ * Called by {@link MaintenanceRegistry} and enables it to check if
maintenance is still needed
+ * for component that provided this callback.
+ *
+ * User may fix maintenance situation by hand when node was down thus
before going to maintenance mode
+ * we should be able to check if it is still necessary.
+ *
+ * @return {@code True} if maintenance is still needed for the component.
+ */
+ public boolean shouldProceedWithMaintenance();
+
+ /**
+ * Supplies list of {@link MaintenanceAction}s that user can call to fix
maintenance situation for the component or
+ * get information about ongoing actions. Should not be null or empty.
+ *
+ * @return Not null and non-empty {@link List} of {@link
MaintenanceAction}.
+ */
+ @NotNull public List<MaintenanceAction> allActions();
+
+ /**
+ * Component can provide optional {@link MaintenanceAction} that will be
executed automatically
+ * by {@link MaintenanceRegistry} when node enters maintenance mode.
+ *
+ * If no automatic actions are provided {@link MaintenanceRegistry} will
wait for user
+ * to trigger {@link MaintenanceAction} with logic to fix the maintenance
situation.
+ *
+ * @return {@link MaintenanceAction} for automatic execution or null if
maintenance situation
+ * should not be fixed automatically.
+ */
+ @Nullable public MaintenanceAction automaticAction();
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/maintenance/package-info.java
b/modules/core/src/main/java/org/apache/ignite/maintenance/package-info.java
new file mode 100644
index 0000000..71b2a1d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/maintenance/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains public interfaces for maintenance mechanism.
+ */
+package org.apache.ignite.maintenance;
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/isolated/IsolatedDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/isolated/IsolatedDiscoverySpi.java
new file mode 100644
index 0000000..a1d706a
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/isolated/IsolatedDiscoverySpi.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.isolated;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteFeatures;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
+import
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.spi.IgniteSpiAdapter;
+import org.apache.ignite.spi.IgniteSpiContext;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
+import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
+import org.apache.ignite.spi.discovery.DiscoveryNotification;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
+import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
+import org.apache.ignite.spi.discovery.DiscoverySpiListener;
+import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
+import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singleton;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.internal.IgniteFeatures.allNodesSupports;
+import static
org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
+
+/**
+ * Special discovery SPI implementation to start a single-node cluster in
"isolated" mode.
+ *
+ * When used, node doesn't try to seek or communicate to other nodes that may
be up and running even in the same JVM.
+ *
+ * At the same time all functions like sending discovery messages are
functional with only note that
+ * no messages are sent to network but are processed by local node immediately
when they are created.
+ */
+@IgniteSpiMultipleInstancesSupport(true)
+@DiscoverySpiHistorySupport(true)
+@DiscoverySpiOrderSupport(true)
+public class IsolatedDiscoverySpi extends IgniteSpiAdapter implements
IgniteDiscoverySpi {
+ /** */
+ private Serializable consistentId;
+
+ /** */
+ private final long startTime = System.currentTimeMillis();
+
+ /** */
+ private IsolatedNode locNode;
+
+ /** */
+ private DiscoverySpiListener lsnr;
+
+ /** */
+ private ExecutorService exec =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+
+ /** {@inheritDoc} */
+ @Override public Serializable consistentId() throws IgniteSpiException {
+ if (consistentId == null) {
+ IgniteConfiguration cfg = ignite.configuration();
+
+ final Serializable cfgId = cfg.getConsistentId();
+
+ consistentId = cfgId != null ? cfgId : UUID.randomUUID();
+ }
+
+ return consistentId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<ClusterNode> getRemoteNodes() {
+ return emptyList();
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterNode getLocalNode() {
+ return locNode;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterNode getNode(UUID nodeId) {
+ return locNode.id().equals(nodeId) ? locNode : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean pingNode(UUID nodeId) {
+ return locNode.id().equals(nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setNodeAttributes(Map<String, Object> attrs,
IgniteProductVersion ver) {
+ locNode = new IsolatedNode(ignite.configuration().getNodeId(), attrs,
ver);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
+ this.lsnr = lsnr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setDataExchange(DiscoverySpiDataExchange exchange) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setMetricsProvider(DiscoveryMetricsProvider
metricsProvider) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void disconnect() throws IgniteSpiException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator auth)
{
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getGridStartTime() {
+ return startTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg)
throws IgniteException {
+ exec.execute(() -> {
+ IgniteFuture<?> fut = lsnr.onDiscovery(new DiscoveryNotification(
+ EVT_DISCOVERY_CUSTOM_EVT,
+ 1,
+ locNode,
+ singleton(locNode),
+ null,
+ msg,
+ null));
+
+ // Acknowledge message must be send after initial message
processed.
+ fut.listen((f) -> {
+ DiscoverySpiCustomMessage ack = msg.ackMessage();
+
+ if (ack != null) {
+ exec.execute(() -> lsnr.onDiscovery(new
DiscoveryNotification(
+ EVT_DISCOVERY_CUSTOM_EVT,
+ 1,
+ locNode,
+ singleton(locNode),
+ null,
+ ack,
+ null))
+ );
+ }
+ });
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public void failNode(UUID nodeId, @Nullable String warning) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isClientMode() throws IllegalStateException {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void spiStart(@Nullable String igniteInstanceName) throws
IgniteSpiException {
+ exec.execute(() -> {
+ lsnr.onLocalNodeInitialized(locNode);
+
+ lsnr.onDiscovery(new DiscoveryNotification(
+ EVT_NODE_JOINED,
+ 1,
+ locNode,
+ singleton(locNode))
+ );
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public void spiStop() throws IgniteSpiException {
+ exec.shutdownNow();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void onContextInitialized0(final IgniteSpiContext
spiCtx) throws IgniteSpiException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean knownNode(UUID nodeId) {
+ return getNode(nodeId) != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean clientReconnectSupported() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void clientReconnect() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean allNodesSupport(IgniteFeatures feature) {
+ if (locNode == null)
+ return false;
+
+ return allNodesSupports(singleton(locNode), feature);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void simulateNodeFailure() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void
setInternalListener(IgniteDiscoverySpiInternalListener lsnr) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean supportsCommunicationFailureResolve() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void resolveCommunicationFailure(ClusterNode node,
Exception err) {
+ // No-op.
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/isolated/IsolatedNode.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/isolated/IsolatedNode.java
new file mode 100644
index 0000000..d623650
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/isolated/IsolatedNode.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.isolated;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.cache.CacheMetrics;
+import org.apache.ignite.cluster.ClusterMetrics;
+import org.apache.ignite.internal.ClusterMetricsSnapshot;
+import org.apache.ignite.internal.managers.discovery.IgniteClusterNode;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteProductVersion;
+
+import static
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_NODE_CONSISTENT_ID;
+
+/**
+ * Special isolated node.
+ */
+public class IsolatedNode implements IgniteClusterNode {
+ /** */
+ private final UUID id;
+
+ /** */
+ private final IgniteProductVersion ver;
+
+ /** Consistent ID. */
+ private Object consistentId;
+
+ /** Node attributes. */
+ private Map<String, Object> attrs;
+
+ /** */
+ private volatile ClusterMetrics metrics = new ClusterMetricsSnapshot();
+
+ /** */
+ private volatile Map<Integer, CacheMetrics> cacheMetrics =
Collections.emptyMap();
+
+ /**
+ * @param id Node ID.
+ * @param attrs Node attributes.
+ * @param ver Node version.
+ */
+ public IsolatedNode(UUID id, Map<String, Object> attrs,
IgniteProductVersion ver) {
+ this.id = id;
+ this.attrs = U.sealMap(attrs);
+ this.ver = ver;
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID id() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object consistentId() {
+ return consistentId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T attribute(String name) {
+ return (T)attrs.get(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterMetrics metrics() {
+ return metrics;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, Object> attributes() {
+ return attrs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<String> addresses() {
+ return Collections.singleton("127.0.0.1");
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<String> hostNames() {
+ return Collections.singleton("localhost");
+ }
+
+ /** {@inheritDoc} */
+ @Override public long order() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteProductVersion version() {
+ return ver;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isLocal() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isDaemon() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isClient() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setConsistentId(Serializable consistentId) {
+ this.consistentId = consistentId;
+
+ final Map<String, Object> map = new HashMap<>(attrs);
+
+ map.put(ATTR_NODE_CONSISTENT_ID, consistentId);
+
+ attrs = Collections.unmodifiableMap(map);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setMetrics(ClusterMetrics metrics) {
+ this.metrics = metrics;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<Integer, CacheMetrics> cacheMetrics() {
+ return cacheMetrics;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setCacheMetrics(Map<Integer, CacheMetrics>
cacheMetrics) {
+ this.cacheMetrics = cacheMetrics != null ? cacheMetrics :
Collections.emptyMap();
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/cache/RebalanceCancellationTest.java
b/modules/core/src/test/java/org/apache/ignite/cache/RebalanceCancellationTest.java
index 2dbf804..2ed03ff 100644
---
a/modules/core/src/test/java/org/apache/ignite/cache/RebalanceCancellationTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/cache/RebalanceCancellationTest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -68,7 +69,7 @@ public class RebalanceCancellationTest extends
GridCommonAbstractTest {
public boolean persistenceEnabled;
/** Add additional non-persistence data region. */
- public boolean addtiotionalMemRegion;
+ public boolean additionalMemRegion;
/** Filter node. */
public boolean filterNode;
@@ -86,7 +87,7 @@ public class RebalanceCancellationTest extends
GridCommonAbstractTest {
.setAffinity(new RendezvousAffinityFunction(false, 15))
.setBackups(BACKUPS));
- if (addtiotionalMemRegion) {
+ if (additionalMemRegion) {
cfg.setCacheConfiguration(cfg.getCacheConfiguration()[0],
new CacheConfiguration(MEM_REGION_CACHE)
.setDataRegionName(MEM_REGION)
@@ -116,12 +117,15 @@ public class RebalanceCancellationTest extends
GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
+ stopAllGrids();
+
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
+
cleanPersistenceDir();
}
@@ -249,7 +253,7 @@ public class RebalanceCancellationTest extends
GridCommonAbstractTest {
@Test
public void testComplexCompatibilityInMemory() throws Exception {
persistenceEnabled = false;
- addtiotionalMemRegion = false;
+ additionalMemRegion = false;
IgniteEx crd = startGrid(0);
@@ -320,16 +324,16 @@ public class RebalanceCancellationTest extends
GridCommonAbstractTest {
* Trigger rebalance when dynamic caches stop/start.
*
* @param persistence Persistent flag.
- * @param addtiotionalRegion Use additional (non default) region.
+ * @param additionalRegion Use additional (non default) region.
* @throws Exception If failed.
*/
- public void testRebalanceDynamicCache(boolean persistence, boolean
addtiotionalRegion) throws Exception {
+ public void testRebalanceDynamicCache(boolean persistence, boolean
additionalRegion) throws Exception {
persistenceEnabled = persistence;
- addtiotionalMemRegion = addtiotionalRegion;
+ additionalMemRegion = additionalRegion;
IgniteEx ignite0 = startGrids(NODES_CNT);
- ignite0.cluster().active(true);
+ ignite0.cluster().state(ClusterState.ACTIVE);
grid(1).close();
@@ -371,14 +375,14 @@ public class RebalanceCancellationTest extends
GridCommonAbstractTest {
* Trigger rebalance when non-blt node left topology.
*
* @param persistence Persistent flag.
- * @param addtiotionalRegion Use additional (non default) region.
+ * @param additionalRegion Use additional (non default) region.
* @param fail If true node forcibly falling.
* @throws Exception If failed.
*/
- public void testRebalanceNoneBltNode(boolean persistence, boolean
addtiotionalRegion,
+ public void testRebalanceNoneBltNode(boolean persistence, boolean
additionalRegion,
boolean fail) throws Exception {
persistenceEnabled = persistence;
- addtiotionalMemRegion = addtiotionalRegion;
+ additionalMemRegion = additionalRegion;
IgniteEx ignite0 = startGrids(NODES_CNT);
@@ -445,7 +449,7 @@ public class RebalanceCancellationTest extends
GridCommonAbstractTest {
*/
public void testRebalanceFilteredNode(boolean persistence, boolean
addtiotionalRegion) throws Exception {
persistenceEnabled = persistence;
- addtiotionalMemRegion = addtiotionalRegion;
+ additionalMemRegion = addtiotionalRegion;
filterNode = true;
IgniteEx ignite0 = startGrids(NODES_CNT);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WalModeChangeAdvancedSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WalModeChangeAdvancedSelfTest.java
index 9069978..9eb3b7c 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WalModeChangeAdvancedSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WalModeChangeAdvancedSelfTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache;
+import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
@@ -26,9 +27,14 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.IgniteClientReconnectAbstractTest;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.persistence.CheckpointState;
+import
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.GridTestUtils.SF;
import org.junit.Ignore;
@@ -36,6 +42,8 @@ import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DATA_FILENAME;
/**
* Concurrent and advanced tests for WAL state change.
@@ -64,6 +72,143 @@ public class WalModeChangeAdvancedSelfTest extends
WalModeChangeCommonAbstractSe
}
/**
+ * Verifies that node with consistent partitions (fully synchronized with
disk on a previous checkpoint)
+ * starts successfully even if WAL for that cache group is globally
disabled.
+ *
+ * <p>
+ * Test scenario:
+ * </p>
+ * <ol>
+ * <li>
+ * Start a cluster from one server node, activate cluster.
+ * </li>
+ * <li>
+ * Create new cache. Disable WAL for the cache and put some data
to it.
+ * </li>
+ * <li>
+ * Trigger checkpoint and wait for it finish. Restart node.
+ * </li>
+ * <li>
+ * Verify that node starts successfully and data is presented in
the cache.
+ * </li>
+ * </ol>
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testConsistentDataPreserved() throws Exception {
+ Ignite srv = startGrid(config(SRV_1, false, false));
+
+ srv.cluster().state(ACTIVE);
+
+ IgniteCache cache1 = srv.getOrCreateCache(cacheConfig(CACHE_NAME,
PARTITIONED, TRANSACTIONAL));
+
+ srv.cluster().disableWal(CACHE_NAME);
+
+ for (int i = 0; i < 10; i++)
+ cache1.put(i, i);
+
+ GridCacheDatabaseSharedManager dbMrg0 =
(GridCacheDatabaseSharedManager)
((IgniteEx)srv).context().cache().context().database();
+
+ dbMrg0.forceCheckpoint("cp").futureFor(CheckpointState.FINISHED).get();
+
+ stopGrid(SRV_1);
+
+ srv = startGrid(config(SRV_1, false, false));
+
+ assertForAllNodes(CACHE_NAME, false);
+
+ cache1 = srv.cache(CACHE_NAME);
+
+ for (int i = 0; i < 10; i++)
+ assertNotNull(cache1.get(i));
+ }
+
+ /**
+ * If user manually clears corrupted files when node was down, node
detects this and not enters maintenance mode
+ * (although still need another restart to get back to normal operations).
+ *
+ * <p>
+ * Test scenario:
+ * <ol>
+ * <li>
+ * Start server node, create cache, disable WAL for the cache,
put some keys to it.
+ * </li>
+ * <li>
+ * Stop server node, remove checkpoint end markers from cp
directory
+ * to make node think it has failed in the middle of
checkpoint.
+ * </li>
+ * <li>
+ * Start the node, verify it fails to start because of
corrupted PDS of the cache.
+ * </li>
+ * <li>
+ * Clean data directory of the cache, start node again, verify
it doesn't report maintenance mode.
+ * </li>
+ * <li>
+ * Restart node and verify it is in normal operations mode.
+ * </li>
+ * </ol>
+ * </p>
+ *
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testMaintenanceIsSkippedIfWasFixedManuallyOnDowntime() throws
Exception {
+ IgniteEx srv = startGrid(config(SRV_1, false, false));
+
+ File cacheToClean = cacheDir(srv, CACHE_NAME);
+
+ String ig0Folder =
srv.context().pdsFolderResolver().resolveFolders().folderName();
+ File dbDir =
U.resolveWorkDirectory(srv.configuration().getWorkDirectory(), "db", false);
+
+ File ig0LfsDir = new File(dbDir, ig0Folder);
+ File ig0CpDir = new File(ig0LfsDir, "cp");
+
+ srv.cluster().state(ACTIVE);
+
+ IgniteCache cache1 = srv.getOrCreateCache(cacheConfig(CACHE_NAME,
PARTITIONED, TRANSACTIONAL));
+
+ srv.cluster().disableWal(CACHE_NAME);
+
+ for (int i = 0; i < 10; i++)
+ cache1.put(i, i);
+
+ stopAllGrids(true);
+
+ File[] cpMarkers = ig0CpDir.listFiles();
+
+ for (File cpMark : cpMarkers) {
+ if (cpMark.getName().contains("-END"))
+ cpMark.delete();
+ }
+
+ // Node should fail as its PDS may be corrupted because of disabled WAL
+ GridTestUtils.assertThrows(null,
+ () -> startGrid(config(SRV_1, false, false)),
+ Exception.class,
+ null);
+
+ cleanCacheDir(cacheToClean);
+
+ // Node should start successfully and not enter maintenance mode as
MaintenanceRecord will be cleaned
+ // automatically because corrupted PDS was deleted during downtime
+ srv = startGrid(config(SRV_1, false, false));
+ assertFalse(srv.context().maintenanceRegistry().isMaintenanceMode());
+
+ stopAllGrids(false);
+
+ // After restart node works normal mode even without executing
maintenance action to clear corrupted PDS
+ srv = startGrid(config(SRV_1, false, false));
+ assertFalse(srv.context().maintenanceRegistry().isMaintenanceMode());
+
+ srv.cluster().state(ACTIVE);
+
+ cache1 = srv.getOrCreateCache(CACHE_NAME);
+ assertEquals(0, cache1.size());
+ }
+
+ /**
* Test cache cleanup on restart.
*
* @throws Exception If failed.
@@ -72,7 +217,9 @@ public class WalModeChangeAdvancedSelfTest extends
WalModeChangeCommonAbstractSe
public void testCacheCleanup() throws Exception {
Ignite srv = startGrid(config(SRV_1, false, false));
- srv.cluster().active(true);
+ File cacheToClean = cacheDir(srv, CACHE_NAME_2);
+
+ srv.cluster().state(ACTIVE);
IgniteCache cache1 = srv.getOrCreateCache(cacheConfig(CACHE_NAME,
PARTITIONED, TRANSACTIONAL));
IgniteCache cache2 = srv.getOrCreateCache(cacheConfig(CACHE_NAME_2,
PARTITIONED, TRANSACTIONAL));
@@ -118,9 +265,11 @@ public class WalModeChangeAdvancedSelfTest extends
WalModeChangeCommonAbstractSe
stopAllGrids(true);
+ cleanCacheDir(cacheToClean);
+
srv = startGrid(config(SRV_1, false, false));
- srv.cluster().active(true);
+ srv.cluster().state(ACTIVE);
cache1 = srv.cache(CACHE_NAME);
cache2 = srv.cache(CACHE_NAME_2);
@@ -132,6 +281,24 @@ public class WalModeChangeAdvancedSelfTest extends
WalModeChangeCommonAbstractSe
assertEquals(0, cache2.size());
}
+ /** */
+ private File cacheDir(Ignite ig, String cacheName) throws
IgniteCheckedException {
+ String igFolder =
((IgniteEx)ig).context().pdsFolderResolver().resolveFolders().folderName();
+ File dbDir =
U.resolveWorkDirectory(ig.configuration().getWorkDirectory(), "db", false);
+
+ File igPdsFolder = new File(dbDir, igFolder);
+
+ return new File(igPdsFolder, "cache-" + cacheName);
+ }
+
+ /** */
+ private void cleanCacheDir(File cacheDir) {
+ for (File f : cacheDir.listFiles()) {
+ if (!f.getName().equals(CACHE_DATA_FILENAME))
+ f.delete();
+ }
+ }
+
/**
* Test simple node join.
*
@@ -162,7 +329,7 @@ public class WalModeChangeAdvancedSelfTest extends
WalModeChangeCommonAbstractSe
// Start node and disable WAL.
Ignite srv = startGrid(config(SRV_1, false, crdFiltered));
- srv.cluster().active(true);
+ srv.cluster().state(ACTIVE);
srv.getOrCreateCache(cacheConfig(PARTITIONED));
assertForAllNodes(CACHE_NAME, true);
@@ -173,7 +340,9 @@ public class WalModeChangeAdvancedSelfTest extends
WalModeChangeCommonAbstractSe
}
// Start other nodes.
- startGrid(config(SRV_2, false, false));
+ IgniteEx ig2 = startGrid(config(SRV_2, false, false));
+
+ File ig2CacheDir = cacheDir(ig2, CACHE_NAME);
if (crdFiltered)
srv.cluster().disableWal(CACHE_NAME);
@@ -196,6 +365,8 @@ public class WalModeChangeAdvancedSelfTest extends
WalModeChangeCommonAbstractSe
assertForAllNodes(CACHE_NAME, true);
}
+ cleanCacheDir(ig2CacheDir);
+
// Start other nodes again.
startGrid(config(SRV_2, false, false));
@@ -244,7 +415,7 @@ public class WalModeChangeAdvancedSelfTest extends
WalModeChangeCommonAbstractSe
Ignite cli = startGrid(config(CLI, true, false));
- cli.cluster().active(true);
+ cli.cluster().state(ACTIVE);
cli.getOrCreateCache(cacheConfig(PARTITIONED));
@@ -268,7 +439,12 @@ public class WalModeChangeAdvancedSelfTest extends
WalModeChangeCommonAbstractSe
victimName = SRV_2;
try {
+ File cacheDir = cacheDir(grid(victimName), CACHE_NAME);
+
stopGrid(victimName);
+
+ cleanCacheDir(cacheDir);
+
startGrid(config(victimName, false, false));
Thread.sleep(200);
@@ -313,7 +489,7 @@ public class WalModeChangeAdvancedSelfTest extends
WalModeChangeCommonAbstractSe
final Ignite srv = startGrid(config(SRV_1, false, false));
Ignite cli = startGrid(config(CLI, true, false));
- cli.cluster().active(true);
+ cli.cluster().state(ACTIVE);
cli.getOrCreateCache(cacheConfig(PARTITIONED));
@@ -373,7 +549,7 @@ public class WalModeChangeAdvancedSelfTest extends
WalModeChangeCommonAbstractSe
final Ignite srv = startGrid(config(SRV_1, false, false));
Ignite cli = startGrid(config(CLI, true, false));
- cli.cluster().active(true);
+ cli.cluster().state(ACTIVE);
srv.createCache(cacheConfig(PARTITIONED));
@@ -439,7 +615,7 @@ public class WalModeChangeAdvancedSelfTest extends
WalModeChangeCommonAbstractSe
final Ignite cacheCli = startGrid(config(CLI_2, true, false));
- cacheCli.cluster().active(true);
+ cacheCli.cluster().state(ACTIVE);
final IgniteCache cache =
cacheCli.getOrCreateCache(cacheConfig(PARTITIONED));
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
index 1c40db3..03ad4dd 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
@@ -22,13 +22,15 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.file.OpenOption;
-import java.util.Collection;
+import java.util.Arrays;
import java.util.Collections;
-import java.util.Set;
-import java.util.TreeSet;
+import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
@@ -56,7 +58,11 @@ import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.maintenance.MaintenanceAction;
+import org.apache.ignite.maintenance.MaintenanceRegistry;
import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
@@ -65,6 +71,9 @@ import
org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Assert;
import org.junit.Test;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CORRUPTED_DATA_FILES_MNTC_TASK_NAME;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/**
@@ -112,6 +121,7 @@ public class LocalWalModeChangeDuringRebalancingSelfTest
extends GridCommonAbstr
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
// Test checks internal state before and after rebalance, so
it is configured to be triggered manually
.setRebalanceDelay(-1)
+ .setAffinity(new RendezvousAffinityFunction(false, 32))
.setBackups(dfltCacheBackupCnt),
new CacheConfiguration(REPL_CACHE)
@@ -253,7 +263,7 @@ public class LocalWalModeChangeDuringRebalancingSelfTest
extends GridCommonAbstr
Ignite ignite = startGrids(3);
ignite.cluster().baselineAutoAdjustEnabled(false);
- ignite.cluster().active(true);
+ ignite.cluster().state(ACTIVE);
IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);
IgniteCache<Integer, Integer> cache2 = ignite.cache(REPL_CACHE);
@@ -338,7 +348,7 @@ public class LocalWalModeChangeDuringRebalancingSelfTest
extends GridCommonAbstr
Ignite ignite = startGrids(3);
ignite.cluster().baselineAutoAdjustEnabled(false);
- ignite.cluster().active(true);
+ ignite.cluster().state(ACTIVE);
ignite.cluster().setBaselineTopology(3);
@@ -393,7 +403,7 @@ public class LocalWalModeChangeDuringRebalancingSelfTest
extends GridCommonAbstr
Ignite ignite = startGrids(3);
ignite.cluster().baselineAutoAdjustEnabled(false);
- ignite.cluster().active(true);
+ ignite.cluster().state(ACTIVE);
IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);
@@ -436,7 +446,7 @@ public class LocalWalModeChangeDuringRebalancingSelfTest
extends GridCommonAbstr
Ignite ignite = startGrids(nodeCnt);
- ignite.cluster().active(true);
+ ignite.cluster().state(ACTIVE);
IgniteCache<Integer, Integer> cache = ignite.cache(REPL_CACHE);
@@ -507,7 +517,7 @@ public class LocalWalModeChangeDuringRebalancingSelfTest
extends GridCommonAbstr
Ignite ignite = startGrids(3);
ignite.cluster().baselineAutoAdjustEnabled(false);
- ignite.cluster().active(true);
+ ignite.cluster().state(ACTIVE);
IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);
@@ -547,60 +557,218 @@ public class LocalWalModeChangeDuringRebalancingSelfTest
extends GridCommonAbstr
}
/**
+ * Node doesn't delete consistent PDS when WAL was turned off
automatically (disable WAL during rebalancing feature).
+ *
+ * <p>
+ * Test scenario:
+ * <ol>
+ * <li>
+ * Two server nodes are started, cluster is activated, baseline is
set. 2500 keys are put into cache.
+ * </li>
+ * <li>
+ * Checkpoint is started and finished on both nodes.
+ * </li>
+ * <li>
+ * Node n1 is stopped, another 2500 keys are put into the same
cache.
+ * </li>
+ * <li>
+ * Node n1 is started back so rebalancing is triggered from n0 to
n1. WAL is turned off on n1 automatically.
+ * </li>
+ * <li>
+ * Both nodes are stopped without checkpoint.
+ * </li>
+ * <li>
+ * Node n1 is started and activated. Lost partitions are reset.
+ * </li>
+ * <li>
+ * First 2500 keys are found in cache thus PDS wasn't removed on
restart.
+ * </li>
+ * <li>
+ * Second 2500 keys are not found in cache as WAL was disabled
during rebalancing
+ * and no checkpoint was triggered.
+ * </li>
+ * </ol>
+ * </p>
+ *
* @throws Exception If failed.
*/
@Test
- public void testDataClearedAfterRestartWithDisabledWal() throws Exception {
- Ignite ignite = startGrid(0);
+ public void testConsistentPdsIsNotClearedAfterRestartWithDisabledWal()
throws Exception {
+ dfltCacheBackupCnt = 1;
- ignite.cluster().baselineAutoAdjustEnabled(false);
- ignite.cluster().active(true);
+ IgniteEx ig0 = startGrid(0);
+ IgniteEx ig1 = startGrid(1);
- IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);
+ ig0.cluster().baselineAutoAdjustEnabled(false);
+ ig0.cluster().state(ACTIVE);
- int keysCnt = getKeysCount();
+ IgniteCache<Integer, Integer> cache = ig0.cache(DEFAULT_CACHE_NAME);
- for (int k = 0; k < keysCnt; k++)
+ for (int k = 0; k < 2500; k++)
cache.put(k, k);
- IgniteEx newIgnite = startGrid(1);
+ GridCacheDatabaseSharedManager dbMrg0 =
(GridCacheDatabaseSharedManager) ig0.context().cache().context().database();
+ GridCacheDatabaseSharedManager dbMrg1 =
(GridCacheDatabaseSharedManager) ig1.context().cache().context().database();
- newIgnite.cluster().setBaselineTopology(2);
+ dbMrg0.forceCheckpoint("cp").futureFor(CheckpointState.FINISHED).get();
+ dbMrg1.forceCheckpoint("cp").futureFor(CheckpointState.FINISHED).get();
- // Await fully exchange complete.
- awaitExchange(newIgnite);
+ stopGrid(1);
- CacheGroupContext grpCtx =
newIgnite.cachex(DEFAULT_CACHE_NAME).context().group();
+ for (int k = 2500; k < 5000; k++)
+ cache.put(k, k);
- assertFalse(grpCtx.localWalEnabled());
+ ig1 = startGrid(1);
- stopGrid(1);
- stopGrid(0);
+ awaitExchange(ig1);
- newIgnite = startGrid(1);
+ stopAllGrids(false);
- newIgnite.cluster().active(true);
+ ig1 = startGrid(1);
+ ig1.cluster().state(ACTIVE);
- newIgnite.cluster().setBaselineTopology(newIgnite.cluster().nodes());
+ ig1.resetLostPartitions(Arrays.asList(DEFAULT_CACHE_NAME));
- cache = newIgnite.cache(DEFAULT_CACHE_NAME);
+ awaitExchange(ig1);
- Collection<Integer> lostParts = cache.lostPartitions();
+ cache = ig1.cache(DEFAULT_CACHE_NAME);
- Set<Integer> keys = new TreeSet<>();
+ for (int k = 0; k < 2500; k++)
+ assertTrue(cache.containsKey(k));
- for (int k = 0; k < keysCnt; k++) {
- // Skip lost partitions.
- if
(lostParts.contains(newIgnite.affinity(DEFAULT_CACHE_NAME).partition(k)))
- continue;
+ for (int k = 2500; k < 5000; k++)
+ assertFalse(cache.containsKey(k));
+ }
+
+ /**
+ * Test is opposite to {@link
#testConsistentPdsIsNotClearedAfterRestartWithDisabledWal()}
+ *
+ * <p>
+ * Test scenario:
+ * <ol>
+ * <li>
+ * Two server nodes are started, cluster is activated, baseline
is set. 2500 keys are put into cache.
+ * </li>
+ * <li>
+ * Checkpoint is started and finished on both nodes.
+ * </li>
+ * <li>
+ * Node n1 is stopped, another 2500 keys are put into the same
cache.
+ * </li>
+ * <li>
+ * Node n1 is started back so rebalancing is triggered from n0 to
n1. WAL is turned off on n1 automatically.
+ * </li>
+ * <li>
+ * Both nodes are stopped without checkpoint.
+ * </li>
+ * <li>
+ * CP END marker for the first checkpoint is removed on node n1
so node will think it crushed during checkpoint
+ * on the next restart.
+ * </li>
+ * <li>
+ * Node n1 fails to start as it sees potentially corrupted files
of one cache. Manual action is required.
+ * </li>
+ * <li>
+ * Cache files are cleaned up manually on node n1 and it starts
successfully.
+ * </li>
+ * </ol>
+ * </p>
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void
testPdsWithBrokenBinaryConsistencyIsClearedAfterRestartWithDisabledWal() throws
Exception {
+ dfltCacheBackupCnt = 1;
+
+ IgniteEx ig0 = startGrid(0);
+ IgniteEx ig1 = startGrid(1);
+
+ String ig1Folder =
ig1.context().pdsFolderResolver().resolveFolders().folderName();
+ File dbDir =
U.resolveWorkDirectory(ig1.configuration().getWorkDirectory(), "db", false);
+
+ File ig1LfsDir = new File(dbDir, ig1Folder);
+ File ig1CpDir = new File(ig1LfsDir, "cp");
+
+ ig0.cluster().baselineAutoAdjustEnabled(false);
+ ig0.cluster().state(ACTIVE);
+
+ IgniteCache<Integer, Integer> cache = ig0.cache(DEFAULT_CACHE_NAME);
+
+ for (int k = 0; k < 2500; k++)
+ cache.put(k, k);
- keys.add(k);
+ GridCacheDatabaseSharedManager dbMrg0 =
(GridCacheDatabaseSharedManager) ig0.context().cache().context().database();
+ GridCacheDatabaseSharedManager dbMrg1 =
(GridCacheDatabaseSharedManager) ig1.context().cache().context().database();
+
+ dbMrg0.forceCheckpoint("cp").futureFor(CheckpointState.FINISHED).get();
+ dbMrg1.forceCheckpoint("cp").futureFor(CheckpointState.FINISHED).get();
+
+ stopGrid(1);
+
+ for (int k = 2500; k < 5000; k++)
+ cache.put(k, k);
+
+ ig1 = startGrid(1);
+
+ awaitExchange(ig1);
+
+ stopAllGrids(false);
+
+ ig0 = startGrid(0);
+
+ File[] cpMarkers = ig1CpDir.listFiles();
+
+ for (File cpMark : cpMarkers) {
+ if (cpMark.getName().contains("-END"))
+ cpMark.delete();
}
- for (Integer k : keys)
- assertFalse("k=" + k + ", v=" + cache.get(k),
cache.containsKey(k));
+ assertThrows(null, () -> startGrid(1), Exception.class, null);
+
+ ig1 = startGrid(1);
+
+ assertEquals(1, ig0.cluster().nodes().size());
+ assertEquals(1, ig1.cluster().nodes().size());
+
+ AtomicBoolean actionNotFound = new AtomicBoolean(false);
+
+ ig1.compute().run(new IgniteRunnable() {
+ @IgniteInstanceResource
+ private Ignite ig;
+
+ @Override public void run() {
+ MaintenanceRegistry mntcRegistry = ((IgniteEx)
ig).context().maintenanceRegistry();
+
+ List<MaintenanceAction> actions = mntcRegistry
+
.actionsForMaintenanceTask(CORRUPTED_DATA_FILES_MNTC_TASK_NAME);
+
+ Optional<MaintenanceAction> optional = actions
+ .stream()
+ .filter(a ->
a.name().equals(CleanCacheStoresMaintenanceAction.ACTION_NAME)).findFirst();
+
+ if (!optional.isPresent())
+ actionNotFound.set(true);
+ else
+ optional.get().execute();
+
+
mntcRegistry.unregisterMaintenanceTask(CORRUPTED_DATA_FILES_MNTC_TASK_NAME);
+ }
+ });
+
+ assertFalse("Action to clear corrupted PDS is not found",
actionNotFound.get());
+
+ stopAllGrids();
+
+ ig1 = startGrid(1);
+
+ ig1.cluster().state(ACTIVE);
+
+ assertEquals(1, ig1.cluster().nodes().size());
+
+ cache = ig1.cache(DEFAULT_CACHE_NAME);
- assertFalse(cache.containsKeys(keys));
+ for (int k = 0; k < 2500; k++)
+ assertFalse(cache.containsKey(k));
}
/**
@@ -611,7 +779,7 @@ public class LocalWalModeChangeDuringRebalancingSelfTest
extends GridCommonAbstr
Ignite ignite = startGrids(4);
ignite.cluster().baselineAutoAdjustEnabled(false);
- ignite.cluster().active(true);
+ ignite.cluster().state(ACTIVE);
IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/MaintenanceRegistrySimpleTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/MaintenanceRegistrySimpleTest.java
new file mode 100644
index 0000000..5866d43
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/MaintenanceRegistrySimpleTest.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.maintenance.MaintenanceProcessor;
+import
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
+import
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFoldersResolver;
+import
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.maintenance.MaintenanceAction;
+import org.apache.ignite.maintenance.MaintenanceTask;
+import org.apache.ignite.maintenance.MaintenanceWorkflowCallback;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.logger.GridTestLog4jLogger;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Simple unit test to cover basic MaintenanceRegistry functionality like
action validations,
+ * maintenance tasks structure etc.
+ */
+public class MaintenanceRegistrySimpleTest {
+ /** */
+ private final IgniteLogger log = new GridTestLog4jLogger();
+
+ /** */
+ @Before
+ public void beforeTest() throws Exception {
+ cleanMaintenanceRegistryFile();
+ }
+
+ /** */
+ @After
+ public void afterTest() throws Exception {
+ cleanMaintenanceRegistryFile();
+ }
+
+ /** */
+ private void cleanMaintenanceRegistryFile() throws Exception {
+ String dftlWorkDir = U.defaultWorkDirectory();
+
+ for (File f : new File(dftlWorkDir).listFiles()) {
+ if (f.getName().endsWith(".mntc"))
+ f.delete();
+ }
+ }
+
+ /** */
+ private GridKernalContext initContext(boolean persistenceEnabled) throws
IgniteCheckedException {
+ String dfltWorkDir = U.defaultWorkDirectory();
+
+ GridKernalContext kctx = new StandaloneGridKernalContext(log, null,
null)
+ {
+ @Override protected IgniteConfiguration
prepareIgniteConfiguration() {
+ IgniteConfiguration cfg = super.prepareIgniteConfiguration();
+
+ cfg.setDataStorageConfiguration(new
DataStorageConfiguration().setDefaultDataRegionConfiguration(
+ new
DataRegionConfiguration().setPersistenceEnabled(persistenceEnabled)
+ ));
+
+ return cfg;
+ }
+
+ @Override public PdsFoldersResolver pdsFolderResolver() {
+ return new PdsFoldersResolver() {
+ @Override public PdsFolderSettings resolveFolders() {
+ return new PdsFolderSettings(new File(dfltWorkDir),
U.maskForFileName(""));
+ }
+ };
+ }
+ };
+
+ return kctx;
+ }
+
+ /**
+ * {@link MaintenanceTask} could be replaced with new parameters after
registration, old task is deleted.
+ *
+ * @throws IgniteCheckedException If initialization failed.
+ */
+ @Test
+ public void testMaintenanceTaskReplacement() throws IgniteCheckedException
{
+ String name0 = "taskName0";
+ String descr = "description";
+ String oldParams = "oldParams";
+ String newParams = "newParams";
+
+ MaintenanceProcessor proc = new
MaintenanceProcessor(initContext(true));
+
+ proc.start();
+
+ assertFalse(proc.isMaintenanceMode());
+
+ proc.registerMaintenanceTask(new MaintenanceTask(name0, descr,
oldParams));
+ proc.registerMaintenanceTask(new MaintenanceTask(name0, descr,
newParams));
+
+ proc.stop(false);
+
+ proc.start();
+
+ assertTrue(proc.isMaintenanceMode());
+ MaintenanceTask task = proc.activeMaintenanceTask(name0);
+
+ assertNotNull(task);
+ assertEquals(newParams, task.parameters());
+ }
+
+ /**
+ * Registered {@link MaintenanceTask} can be deleted before node entered
Maintenance Mode (before node restart).
+ *
+ * @throws IgniteCheckedException If initialization failed.
+ */
+ @Test
+ public void testDeleteMaintenanceTask() throws IgniteCheckedException {
+ String name = "name0";
+
+ MaintenanceTask task = new MaintenanceTask(name, "description", null);
+
+ MaintenanceProcessor proc = new
MaintenanceProcessor(initContext(true));
+
+ proc.start();
+
+ proc.registerMaintenanceTask(task);
+
+ assertFalse(proc.isMaintenanceMode());
+
+ proc.unregisterMaintenanceTask(name);
+
+ proc.stop(false);
+
+ proc.start();
+
+ assertNull(proc.activeMaintenanceTask(name));
+ assertFalse(proc.isMaintenanceMode());
+ }
+
+ /**
+ * Maintenance actions provided by maintenance callback should all have
unique names.
+ *
+ * @throws IgniteCheckedException If initialization failed.
+ */
+ @Test
+ public void testMaintenanceCallbackProvidesActionsWithUniqueNames() throws
IgniteCheckedException {
+ String actionName0 = "action0";
+ String actionName1 = "action1";
+
+ String name0 = "name0";
+ String name1 = "name1";
+
+ MaintenanceProcessor proc = new
MaintenanceProcessor(initContext(true));
+
+ // attempt to register callback with actions with non-unique names
throws exception
+ GridTestUtils.assertThrows(log, () ->
+ proc.registerWorkflowCallback(name0, new
SimpleMaintenanceCallback(Arrays.asList(new SimpleAction(actionName0), new
SimpleAction(actionName0))
+ )),
+ IgniteException.class,
+ "unique names: " + actionName0 + ", " + actionName0);
+
+ // Attempt to register callback with actions with unique names
finishes succesfully
+ proc.registerWorkflowCallback(name1, new
SimpleMaintenanceCallback(Arrays.asList(new SimpleAction(actionName0), new
SimpleAction(actionName1))));
+ }
+
+ /**
+ * Smoke test for writing and restoring back maintenance tasks to/from
file.
+ *
+ * @throws IgniteCheckedException If initialization of Maintenance
Processor failed.
+ */
+ @Test
+ public void testMultipleMaintenanceTasks() throws IgniteCheckedException {
+ MaintenanceProcessor proc = new
MaintenanceProcessor(initContext(true));
+
+ String task0Name = "name0";
+ String task1Name = "name1";
+
+ String desc0 = "task0";
+ String desc1 = "task1";
+
+ String params0 = "task0_param";
+ String params1 = "task1_param";
+
+ proc.start();
+
+ proc.registerMaintenanceTask(new MaintenanceTask(task0Name, desc0,
params0));
+ proc.registerMaintenanceTask(new MaintenanceTask(task1Name, desc1,
params1));
+
+ proc.stop(false);
+
+ proc.start();
+
+ MaintenanceTask task0 = proc.activeMaintenanceTask(task0Name);
+ MaintenanceTask task1 = proc.activeMaintenanceTask(task1Name);
+
+ assertNotNull(task0);
+ assertNotNull(task1);
+
+ assertEquals(desc0, task0.description());
+ assertEquals(desc1, task1.description());
+
+ assertEquals(params0, task0.parameters());
+ assertEquals(params1, task1.parameters());
+ }
+
+ /**
+ *
+ * @throws IgniteCheckedException If initialization of Maintenance
Processor failed.
+ */
+ @Test
+ public void testMaintenanceTasksWithoutParameters() throws
IgniteCheckedException {
+ MaintenanceProcessor proc = new
MaintenanceProcessor(initContext(true));
+
+ String task0Name = "name0";
+ String task1Name = "name1";
+
+ String desc0 = "task0";
+ String desc1 = "task1";
+
+ String params0 = "task0_param";
+
+ // call to initialize file for maintenance tasks
+ proc.start();
+
+ proc.registerMaintenanceTask(new MaintenanceTask(task0Name, desc0,
params0));
+ proc.registerMaintenanceTask(new MaintenanceTask(task1Name, desc1,
null));
+
+ proc.stop(false);
+
+ // call to force Maintenance Processor to read that file and fill
internal collection of maintenance tasks
+ proc.start();
+
+ MaintenanceTask task0 = proc.activeMaintenanceTask(task0Name);
+ MaintenanceTask task1 = proc.activeMaintenanceTask(task1Name);
+
+ assertNotNull(task0);
+ assertNotNull(task1);
+
+ assertEquals(params0, task0.parameters());
+ assertNull(task1.parameters());
+ }
+
+ /**
+ * Name of maintenance action should contain only alphanumeric and
underscore symbols.
+ *
+ * @throws IgniteCheckedException If initialization of Maintenance
Processor failed.
+ */
+ @Test
+ public void testMaintenanceActionNameSymbols() throws
IgniteCheckedException {
+ MaintenanceProcessor proc = new
MaintenanceProcessor(initContext(true));
+
+ String name0 = "name0";
+ String wrongName = "wrong*Name";
+
+ GridTestUtils.assertThrows(log,
+ () -> proc.registerWorkflowCallback(name0, new
SimpleMaintenanceCallback(Arrays.asList(new SimpleAction(wrongName)))),
+ IgniteException.class,
+ "alphanumeric");
+
+ }
+
+ /** */
+ private final class SimpleMaintenanceCallback implements
MaintenanceWorkflowCallback {
+ /** */
+ private final List<MaintenanceAction> actions = new ArrayList<>();
+
+ SimpleMaintenanceCallback(List<MaintenanceAction> actions) {
+ this.actions.addAll(actions);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean shouldProceedWithMaintenance() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull List<MaintenanceAction> allActions() {
+ return actions;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable MaintenanceAction automaticAction() {
+ return null;
+ }
+ }
+
+ /** */
+ private final class SimpleAction implements MaintenanceAction<Void> {
+ /** */
+ private final String name;
+
+ /** */
+ private SimpleAction(String name) {
+ this.name = name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void execute() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull String name() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable String description() {
+ return null;
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java
index cc7c6a9..3c7a2ed 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java
@@ -211,11 +211,6 @@ public class NoOpPageStoreManager implements
IgnitePageStoreManager {
}
/** {@inheritDoc} */
- @Override public void beforeCacheGroupStart(CacheGroupDescriptor grpDesc) {
- // No-op.
- }
-
- /** {@inheritDoc} */
@Override public void onActivate(GridKernalContext kctx) {
// No-op.
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 375efe7..00fc33a 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.HistoricalRebalanceHeuristicsTest;
import
org.apache.ignite.internal.processors.cache.persistence.IgniteDataStorageMetricsSelfTest;
import
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCacheStartStopWithFreqCheckpointTest;
@@ -32,6 +33,7 @@ import
org.apache.ignite.internal.processors.cache.persistence.IgnitePersistentS
import
org.apache.ignite.internal.processors.cache.persistence.IgniteRebalanceScheduleResendPartitionsTest;
import
org.apache.ignite.internal.processors.cache.persistence.LocalWalModeChangeDuringRebalancingSelfTest;
import
org.apache.ignite.internal.processors.cache.persistence.LocalWalModeNoChangeDuringRebalanceOnNonNodeAssignTest;
+import
org.apache.ignite.internal.processors.cache.persistence.MaintenanceRegistrySimpleTest;
import
org.apache.ignite.internal.processors.cache.persistence.WALPreloadingWithCompactionTest;
import
org.apache.ignite.internal.processors.cache.persistence.WalPreloadingConcurrentTest;
import
org.apache.ignite.internal.processors.cache.persistence.baseline.ClientAffinityAssignmentWithBaselineTest;
@@ -134,6 +136,9 @@ public class IgnitePdsTestSuite2 {
GridTestUtils.addTestIfNeeded(suite,
ClusterStateChangeEventTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
ClusterStateChangeEventWithPersistenceTest.class, ignoredTests);
+ // Maintenance tests
+ GridTestUtils.addTestIfNeeded(suite,
MaintenanceRegistrySimpleTest.class, ignoredTests);
+
return suite;
}