Repository: ignite Updated Branches: refs/heads/master fc09631c7 -> 14055683a
IGNITE-6832 Properly handle IO errors while checkpointing - Fixes #3394. Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/14055683 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/14055683 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/14055683 Branch: refs/heads/master Commit: 14055683accb367672fd32f4dbd3f136f4a49f28 Parents: fc09631 Author: Pavel Kovalenko <[email protected]> Authored: Wed Jan 24 18:02:09 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Wed Jan 24 18:02:09 2018 +0300 ---------------------------------------------------------------------- .../configuration/DataStorageConfiguration.java | 6 +- .../ignite/internal/GridKernalContext.java | 14 + .../ignite/internal/GridKernalContextImpl.java | 13 + .../org/apache/ignite/internal/IgnitionEx.java | 33 ++ .../apache/ignite/internal/NodeInvalidator.java | 53 +++ .../pagemem/store/IgnitePageStoreManager.java | 2 +- .../internal/pagemem/wal/StorageException.java | 16 +- .../GridCacheDatabaseSharedManager.java | 12 +- .../cache/persistence/file/FilePageStore.java | 60 +-- .../persistence/file/FilePageStoreManager.java | 114 ++++-- .../file/PersistentStorageIOException.java | 47 +++ .../wal/AbstractWalRecordsIterator.java | 2 +- .../wal/FileWriteAheadLogManager.java | 119 ++---- .../wal/reader/StandaloneGridKernalContext.java | 10 + .../file/IgnitePdsDiskErrorsRecoveringTest.java | 376 +++++++++++++++++++ .../file/IgnitePdsThreadInterruptionTest.java | 143 +++++-- .../db/wal/reader/MockWalIteratorFactory.java | 2 +- .../IgnitePdsWithIndexingCoreTestSuite.java | 2 + 18 files changed, 844 insertions(+), 180 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/14055683/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java index 30507fe..8d91503 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java @@ -224,7 +224,7 @@ public class DataStorageConfiguration implements Serializable { /** Always write full pages. */ private boolean alwaysWriteFullPages = DFLT_WAL_ALWAYS_WRITE_FULL_PAGES; - /** Factory to provide I/O interface for files */ + /** Factory to provide I/O interface for data storage files */ private FileIOFactory fileIOFactory = IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_USE_ASYNC_FILE_IO_FACTORY, true) ? new AsyncFileIOFactory() : new RandomAccessFileIOFactory(); @@ -824,7 +824,7 @@ public class DataStorageConfiguration implements Serializable { /** * Factory to provide implementation of FileIO interface - * which is used for any file read/write operations + * which is used for data storage files read/write operations * * @return File I/O factory */ @@ -834,7 +834,7 @@ public class DataStorageConfiguration implements Serializable { /** * Sets factory to provide implementation of FileIO interface - * which is used for any file read/write operations + * which is used for data storage files read/write operations * * @param fileIOFactory File I/O factory */ http://git-wip-us.apache.org/repos/asf/ignite/blob/14055683/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index ce12b61..a260327 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -655,4 +655,18 @@ public interface GridKernalContext extends Iterable<GridComponent> { * @return subscription processor to manage internal-only (strict node-local) subscriptions between components. */ public GridInternalSubscriptionProcessor internalSubscriptionProcessor(); + + /** + * TODO: Should be replaced with proper implementation in https://issues.apache.org/jira/browse/IGNITE-6891 + * + * @return {@code true} if node was invalidated, false in other case. + */ + public boolean invalidated(); + + /** + * Invalidates node. + * + * TODO: Should be replaced with proper implementation in https://issues.apache.org/jira/browse/IGNITE-6891 + */ + public void invalidate(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/14055683/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 36c6231..9a315e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -385,6 +385,9 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ private GridInternalSubscriptionProcessor internalSubscriptionProc; + /** Node invalidation flag. */ + private volatile boolean invalidated; + /** * No-arg constructor is required by externalization. */ @@ -1092,6 +1095,16 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ + @Override public boolean invalidated() { + return invalidated; + } + + /** {@inheritDoc} */ + @Override public void invalidate() { + invalidated = true; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridKernalContextImpl.class, this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/14055683/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 232476b..ed31f00 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -37,8 +37,11 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Handler; import javax.management.JMException; @@ -387,6 +390,36 @@ public class IgnitionEx { } /** + * Behavior of the method is the almost same as {@link IgnitionEx#stop(String, boolean, boolean)}. + * If node stopping process will not be finished within {@code timeoutMs} whole JVM will be killed. + * + * @param timeoutMs Timeout to wait graceful stopping. + */ + public static boolean stop(@Nullable String name, boolean cancel, boolean stopNotStarted, long timeoutMs) { + final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + + // Schedule delayed node killing if graceful stopping will be not finished within timeout. + executor.schedule(new Runnable() { + @Override + public void run() { + if (state(name) == IgniteState.STARTED) { + U.error(null, "Unable to gracefully stop node within timeout " + timeoutMs + + " milliseconds. Killing node..."); + + // We are not able to kill only one grid so whole JVM will be stopped. + System.exit(Ignition.KILL_EXIT_CODE); + } + } + }, timeoutMs, TimeUnit.MILLISECONDS); + + boolean success = stop(name, cancel, stopNotStarted); + + executor.shutdownNow(); + + return success; + } + + /** * Stops <b>all</b> started grids. If {@code cancel} flag is set to {@code true} then * all jobs currently executing on local node will be interrupted. * If wait parameter is set to {@code true} then grid will wait for all http://git-wip-us.apache.org/repos/asf/ignite/blob/14055683/modules/core/src/main/java/org/apache/ignite/internal/NodeInvalidator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/NodeInvalidator.java b/modules/core/src/main/java/org/apache/ignite/internal/NodeInvalidator.java new file mode 100644 index 0000000..b19ec08 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/NodeInvalidator.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal; + +import org.apache.ignite.IgniteLogger; +import org.jetbrains.annotations.NotNull; + +/** + * Temporary functionality to invalidate and stop the node. + * TODO: Should be replaced on proper implementation in https://issues.apache.org/jira/browse/IGNITE-6891 + */ +public class NodeInvalidator { + public static NodeInvalidator INSTANCE = new NodeInvalidator(); + + private static final long STOP_TIMEOUT_MS = 60 * 1000; + + private NodeInvalidator() { + // Empty + } + + public void invalidate(@NotNull GridKernalContext ctx, @NotNull Throwable error) { + if (ctx.invalidated()) + return; + + ctx.invalidate(); + + final String gridName = ctx.igniteInstanceName(); + final IgniteLogger logger = ctx.log(getClass()); + + logger.error("Critical error with " + gridName + " is happened. " + + "All further operations will be failed and local node will be stopped.", error); + + new Thread("node-stopper") { + @Override public void run() { + IgnitionEx.stop(gridName, true, true, STOP_TIMEOUT_MS); + } + }.start(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/14055683/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java index 2707a5e..1b46bf9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java @@ -39,7 +39,7 @@ public interface IgnitePageStoreManager extends GridCacheSharedManager, IgniteCh /** * Invoked after checkpoint recover is finished. */ - public void finishRecover(); + public void finishRecover() throws IgniteCheckedException; /** * Callback called when a cache is starting. http://git-wip-us.apache.org/repos/asf/ignite/blob/14055683/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/StorageException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/StorageException.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/StorageException.java index e38e5f2..3aa50c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/StorageException.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/StorageException.java @@ -17,11 +17,12 @@ package org.apache.ignite.internal.pagemem.wal; +import java.io.IOException; import org.apache.ignite.IgniteCheckedException; -import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.NotNull; /** - * + * Exception is needed to distinguish WAL manager critical I/O errors. */ public class StorageException extends IgniteCheckedException { /** */ @@ -31,14 +32,21 @@ public class StorageException extends IgniteCheckedException { * @param msg Error message. * @param cause Error cause. */ - public StorageException(String msg, @Nullable Throwable cause) { + public StorageException(String msg, @NotNull IOException cause) { super(msg, cause); } /** * @param e Cause exception. */ - public StorageException(Exception e) { + public StorageException(IOException e) { super(e); } + + /** + * @param msg Error message + */ + public StorageException(String msg) { + super(msg); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/14055683/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index b43827a..f3b11cc 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -76,6 +76,7 @@ import org.apache.ignite.events.EventType; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.NodeInvalidator; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.mem.DirectMemoryProvider; @@ -2805,7 +2806,16 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } // Wait and check for errors. - doneWriteFut.get(); + try { + doneWriteFut.get(); + } catch (IgniteCheckedException e) { + chp.progress.cpFinishFut.onDone(e); + + // In case of writing error node should be invalidated and stopped. + NodeInvalidator.INSTANCE.invalidate(cctx.kernalContext(), e); + + return; + } // Must re-check shutdown flag here because threads may have skipped some pages. // If so, we should not put finish checkpoint mark. http://git-wip-us.apache.org/repos/asf/ignite/blob/14055683/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java index 1153c88..b5f412e 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java @@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.internal.pagemem.PageIdUtils; @@ -154,27 +153,28 @@ public class FilePageStore implements PageStore { } /** + * Initializes header and writes it into the file store. * + * @return Next available position in the file to store a data. + * @throws IOException If initialization is failed. */ - private long initFile() { - try { - ByteBuffer hdr = header(type, dbCfg.getPageSize()); + private long initFile() throws IOException { + ByteBuffer hdr = header(type, dbCfg.getPageSize()); - while (hdr.remaining() > 0) - fileIO.write(hdr); - } - catch (IOException e) { - throw new IgniteException("Check file failed.", e); - } + while (hdr.remaining() > 0) + fileIO.write(hdr); //there is 'super' page in every file return headerSize() + dbCfg.getPageSize(); } /** + * Checks that file store has correct header and size. * + * @return Next available position in the file to store a data. + * @throws PersistentStorageIOException If check is failed. */ - private long checkFile() throws IgniteCheckedException { + private long checkFile() throws PersistentStorageIOException { try { ByteBuffer hdr = ByteBuffer.allocate(headerSize()).order(ByteOrder.LITTLE_ENDIAN); @@ -186,28 +186,28 @@ public class FilePageStore implements PageStore { long signature = hdr.getLong(); if (SIGNATURE != signature) - throw new IgniteCheckedException("Failed to verify store file (invalid file signature)" + + throw new IOException("Failed to verify store file (invalid file signature)" + " [expectedSignature=" + U.hexLong(SIGNATURE) + ", actualSignature=" + U.hexLong(signature) + ']'); int ver = hdr.getInt(); if (version() != ver) - throw new IgniteCheckedException("Failed to verify store file (invalid file version)" + + throw new IOException("Failed to verify store file (invalid file version)" + " [expectedVersion=" + version() + ", fileVersion=" + ver + "]"); byte type = hdr.get(); if (this.type != type) - throw new IgniteCheckedException("Failed to verify store file (invalid file type)" + + throw new IOException("Failed to verify store file (invalid file type)" + " [expectedFileType=" + this.type + ", actualFileType=" + type + "]"); int pageSize = hdr.getInt(); if (dbCfg.getPageSize() != pageSize) - throw new IgniteCheckedException("Failed to verify store file (invalid page size)" + + throw new IOException("Failed to verify store file (invalid page size)" + " [expectedPageSize=" + dbCfg.getPageSize() + ", filePageSize=" + pageSize + "]"); @@ -217,22 +217,22 @@ public class FilePageStore implements PageStore { fileSize = pageSize + headerSize(); if ((fileSize - headerSize()) % pageSize != 0) - throw new IgniteCheckedException("Failed to verify store file (invalid file size)" + + throw new IOException("Failed to verify store file (invalid file size)" + " [fileSize=" + U.hexLong(fileSize) + ", pageSize=" + U.hexLong(pageSize) + ']'); return fileSize; } catch (IOException e) { - throw new IgniteCheckedException("File check failed", e); + throw new PersistentStorageIOException("File check failed", e); } } /** * @param cleanFile {@code True} to delete file. - * @throws IgniteCheckedException If failed. + * @throws PersistentStorageIOException If failed. */ - public void stop(boolean cleanFile) throws IgniteCheckedException { + public void stop(boolean cleanFile) throws PersistentStorageIOException { lock.writeLock().lock(); try { @@ -247,7 +247,7 @@ public class FilePageStore implements PageStore { cfgFile.delete(); } catch (IOException e) { - throw new IgniteCheckedException(e); + throw new PersistentStorageIOException(e); } finally { lock.writeLock().unlock(); @@ -257,7 +257,7 @@ public class FilePageStore implements PageStore { /** * */ - public void truncate(int tag) throws IgniteCheckedException { + public void truncate(int tag) throws PersistentStorageIOException { lock.writeLock().lock(); try { @@ -277,7 +277,7 @@ public class FilePageStore implements PageStore { allocatedTracker.updateTotalAllocatedPages(delta / pageSize); } catch (IOException e) { - throw new IgniteCheckedException(e); + throw new PersistentStorageIOException(e); } finally { lock.writeLock().unlock(); @@ -301,7 +301,7 @@ public class FilePageStore implements PageStore { /** * */ - public void finishRecover() { + public void finishRecover() throws PersistentStorageIOException { lock.writeLock().lock(); try { @@ -320,7 +320,7 @@ public class FilePageStore implements PageStore { recover = false; } catch (IOException e) { - throw new RuntimeException(e); + throw new PersistentStorageIOException("Unable to finish recover", e); } finally { lock.writeLock().unlock(); @@ -378,7 +378,7 @@ public class FilePageStore implements PageStore { PageIO.setCrc(pageBuf, savedCrc32); } catch (IOException e) { - throw new IgniteCheckedException("Read error", e); + throw new PersistentStorageIOException("Read error", e); } } @@ -407,7 +407,7 @@ public class FilePageStore implements PageStore { while (len > 0); } catch (IOException e) { - throw new IgniteCheckedException("Read error", e); + throw new PersistentStorageIOException("Read error", e); } } @@ -442,7 +442,9 @@ public class FilePageStore implements PageStore { inited = true; } catch (IOException e) { - throw err = new IgniteCheckedException("Can't open file: " + cfgFile.getName(), e); + err = new PersistentStorageIOException("Could not initialize file: " + cfgFile.getName(), e); + + throw err; } finally { if (err != null && fileIO != null) @@ -509,7 +511,7 @@ public class FilePageStore implements PageStore { PageIO.setCrc(pageBuf, 0); } catch (IOException e) { - throw new IgniteCheckedException("Failed to write the page to the file store [pageId=" + pageId + + throw new PersistentStorageIOException("Failed to write the page to the file store [pageId=" + pageId + ", file=" + cfgFile.getAbsolutePath() + ']', e); } finally { @@ -547,7 +549,7 @@ public class FilePageStore implements PageStore { fileIO.force(); } catch (IOException e) { - throw new IgniteCheckedException("Sync error", e); + throw new PersistentStorageIOException("Sync error", e); } finally { lock.writeLock().unlock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/14055683/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java index 26e46b2..9549269 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java @@ -39,6 +39,7 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.NodeInvalidator; import org.apache.ignite.internal.pagemem.PageIdAllocator; import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.PageMemory; @@ -51,6 +52,7 @@ import org.apache.ignite.internal.processors.cache.StoredCacheData; import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings; import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage; import org.apache.ignite.internal.processors.cache.persistence.AllocatedPageTracker; +import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.typedef.internal.U; @@ -190,12 +192,19 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen } /** {@inheritDoc} */ - @Override public void finishRecover() { - for (CacheStoreHolder holder : idxCacheStores.values()) { - holder.idxStore.finishRecover(); + @Override public void finishRecover() throws IgniteCheckedException { + try { + for (CacheStoreHolder holder : idxCacheStores.values()) { + holder.idxStore.finishRecover(); - for (FilePageStore partStore : holder.partStores) - partStore.finishRecover(); + for (FilePageStore partStore : holder.partStores) + partStore.finishRecover(); + } + } + catch (PersistentStorageIOException e) { + NodeInvalidator.INSTANCE.invalidate(cctx.kernalContext(), e); + + throw e; } } @@ -309,7 +318,14 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen public void read(int cacheId, long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteCheckedException { PageStore store = getStore(cacheId, PageIdUtils.partId(pageId)); - store.read(pageId, pageBuf, keepCrc); + try { + store.read(pageId, pageBuf, keepCrc); + } + catch (PersistentStorageIOException e) { + NodeInvalidator.INSTANCE.invalidate(cctx.kernalContext(), e); + + throw e; + } } /** {@inheritDoc} */ @@ -323,7 +339,14 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen @Override public void readHeader(int grpId, int partId, ByteBuffer buf) throws IgniteCheckedException { PageStore store = getStore(grpId, partId); - store.readHeader(buf); + try { + store.readHeader(buf); + } + catch (PersistentStorageIOException e) { + NodeInvalidator.INSTANCE.invalidate(cctx.kernalContext(), e); + + throw e; + } } /** {@inheritDoc} */ @@ -352,7 +375,14 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen PageStore store = getStore(cacheId, partId); - store.write(pageId, pageBuf, tag, calculateCrc); + try { + store.write(pageId, pageBuf, tag, calculateCrc); + } + catch (PersistentStorageIOException e) { + NodeInvalidator.INSTANCE.invalidate(cctx.kernalContext(), e); + + throw e; + } return store; } @@ -396,35 +426,42 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen int grpId, int partitions, AllocatedPageTracker allocatedTracker) throws IgniteCheckedException { - boolean dirExisted = checkAndInitCacheWorkDir(cacheWorkDir); + try { + boolean dirExisted = checkAndInitCacheWorkDir(cacheWorkDir); - File idxFile = new File(cacheWorkDir, INDEX_FILE_NAME); + File idxFile = new File(cacheWorkDir, INDEX_FILE_NAME); - if (dirExisted && !idxFile.exists()) - grpsWithoutIdx.add(grpId); + if (dirExisted && !idxFile.exists()) + grpsWithoutIdx.add(grpId); - FilePageStoreFactory pageStoreFactory = new FileVersionCheckingFactory( - pageStoreFileIoFactory, pageStoreV1FileIoFactory, igniteCfg.getDataStorageConfiguration()); + FilePageStoreFactory pageStoreFactory = new FileVersionCheckingFactory( + pageStoreFileIoFactory, pageStoreV1FileIoFactory, igniteCfg.getDataStorageConfiguration()); - FilePageStore idxStore = + FilePageStore idxStore = pageStoreFactory.createPageStore( PageMemory.FLAG_IDX, idxFile, allocatedTracker); - FilePageStore[] partStores = new FilePageStore[partitions]; + FilePageStore[] partStores = new FilePageStore[partitions]; - for (int partId = 0; partId < partStores.length; partId++) { - FilePageStore partStore = - pageStoreFactory.createPageStore( - PageMemory.FLAG_DATA, - getPartitionFile(cacheWorkDir, partId), - allocatedTracker); + for (int partId = 0; partId < partStores.length; partId++) { + FilePageStore partStore = + pageStoreFactory.createPageStore( + PageMemory.FLAG_DATA, + getPartitionFile(cacheWorkDir, partId), + allocatedTracker); - partStores[partId] = partStore; + partStores[partId] = partStore; + } + + return new CacheStoreHolder(idxStore, partStores); } + catch (PersistentStorageIOException e) { + NodeInvalidator.INSTANCE.invalidate(cctx.kernalContext(), e); - return new CacheStoreHolder(idxStore, partStores); + throw e; + } } /** @@ -509,12 +546,26 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen /** {@inheritDoc} */ @Override public void sync(int grpId, int partId) throws IgniteCheckedException { - getStore(grpId, partId).sync(); + try { + getStore(grpId, partId).sync(); + } + catch (PersistentStorageIOException e) { + NodeInvalidator.INSTANCE.invalidate(cctx.kernalContext(), e); + + throw e; + } } /** {@inheritDoc} */ @Override public void ensure(int grpId, int partId) throws IgniteCheckedException { - getStore(grpId, partId).ensure(); + try { + getStore(grpId, partId).ensure(); + } + catch (PersistentStorageIOException e) { + NodeInvalidator.INSTANCE.invalidate(cctx.kernalContext(), e); + + throw e; + } } /** {@inheritDoc} */ @@ -523,9 +574,16 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen PageStore store = getStore(grpId, partId); - long pageIdx = store.allocatePage(); + try { + long pageIdx = store.allocatePage(); + + return PageIdUtils.pageId(partId, flags, (int)pageIdx); + } + catch (PersistentStorageIOException e) { + NodeInvalidator.INSTANCE.invalidate(cctx.kernalContext(), e); - return PageIdUtils.pageId(partId, flags, (int)pageIdx); + throw e; + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/14055683/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/PersistentStorageIOException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/PersistentStorageIOException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/PersistentStorageIOException.java new file mode 100644 index 0000000..7b3c303 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/PersistentStorageIOException.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.persistence.file; + +import java.io.IOException; +import org.apache.ignite.IgniteCheckedException; + +/** + * Exception is needed to distinguish persistent storage I/O errors. + */ +public class PersistentStorageIOException extends IgniteCheckedException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Create an instance of exception. + * + * @param cause Error cause. + */ + public PersistentStorageIOException(IOException cause) { + super(cause); + } + + /** + * Create an instance of exception. + * + * @param msg Error message. + * @param cause Error cause. + */ + public PersistentStorageIOException(String msg, IOException cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/14055683/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java index 195d181..bf59c81 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java @@ -304,7 +304,7 @@ public abstract class AbstractWalRecordsIterator } return new FileWriteAheadLogManager.ReadFileHandle( - fileIO, desc.idx, sharedCtx.igniteInstanceName(), serializerFactory.createSerializer(serVer), in); + fileIO, desc.idx, serializerFactory.createSerializer(serVer), in); } catch (SegmentEofException | EOFException ignore) { try { http://git-wip-us.apache.org/repos/asf/ignite/blob/14055683/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 7b3e938..73751c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -69,7 +69,7 @@ import org.apache.ignite.events.WalSegmentArchivedEvent; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.IgnitionEx; +import org.apache.ignite.internal.NodeInvalidator; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.pagemem.wal.StorageException; @@ -115,8 +115,8 @@ import org.jsr166.ConcurrentHashMap8; import static java.nio.file.StandardOpenOption.CREATE; import static java.nio.file.StandardOpenOption.READ; import static java.nio.file.StandardOpenOption.WRITE; -import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION; import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_MMAP; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION; import static org.apache.ignite.configuration.WALMode.LOG_ONLY; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.SWITCH_SEGMENT_RECORD; import static org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.DIRECT; @@ -212,7 +212,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private static final int BUF_SIZE = 1024 * 1024; /** Use mapped byte buffer. */ - private static boolean mmap = IgniteSystemProperties.getBoolean(IGNITE_WAL_MMAP, true); + private final boolean mmap = IgniteSystemProperties.getBoolean(IGNITE_WAL_MMAP, true); /** {@link FileWriteHandle#written} atomic field updater. */ private static final AtomicLongFieldUpdater<FileWriteHandle> WRITTEN_UPD = @@ -293,9 +293,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** Current log segment handle */ private volatile FileWriteHandle currHnd; - /** Environment failure. */ - private volatile Throwable envFailed; - /** * Positive (non-0) value indicates WAL can be archived even if not complete<br> * See {@link DataStorageConfiguration#setWalAutoArchiveAfterInactivity(long)}<br> @@ -666,7 +663,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } catch (IgniteCheckedException e) { U.error(log, "Unable to perform segment rollover: " + e.getMessage(), e); - handle.invalidateEnvironment(e); + + NodeInvalidator.INSTANCE.invalidate(cctx.kernalContext(), e); } } @@ -714,7 +712,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl else currWrHandle = rollOver(currWrHandle); - checkEnvironment(); + checkNode(); if (isStopping()) throw new IgniteCheckedException("Stopping."); @@ -1107,7 +1105,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl FileWriteHandle hnd = new FileWriteHandle( fileIO, absIdx, - cctx.igniteInstanceName(), off + len, true, ser, @@ -1161,17 +1158,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl lsnr.apply(fileIO); if (mmap) { - try { - MappedByteBuffer buf = fileIO.map((int)maxWalSegmentSize); + MappedByteBuffer buf = fileIO.map((int)maxWalSegmentSize); - rbuf = new SegmentedRingByteBuffer(buf, metrics); - } - catch (IOException e) { - if (e instanceof ClosedByInterruptException) - throw e; - else - throw new IgniteCheckedException(e); - } + rbuf = new SegmentedRingByteBuffer(buf, metrics); } else rbuf = cur.buf.reset(); @@ -1179,7 +1168,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl hnd = new FileWriteHandle( fileIO, cur.idx + 1, - cctx.igniteInstanceName(), 0, false, serializer, @@ -1221,7 +1209,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl return hnd; } catch (IOException e) { - throw new StorageException(e); + StorageException se = new StorageException("Unable to initialize WAL segment", e); + + NodeInvalidator.INSTANCE.invalidate(cctx.kernalContext(), se); + + throw se; } } @@ -1355,12 +1347,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * @throws StorageException If environment is no longer valid and we missed a WAL write. + * @throws StorageException If node is no longer valid and we missed a WAL operation. */ - private void checkEnvironment() throws StorageException { - if (envFailed != null) - throw new StorageException("Failed to flush WAL buffer (environment was invalidated by a " + - "previous error)", envFailed); + private void checkNode() throws StorageException { + if (cctx.kernalContext().invalidated()) + throw new StorageException("Failed to perform WAL operation (environment was invalidated by a " + + "previous error)"); } /** @@ -1903,10 +1895,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl catch (IgniteCheckedException | IOException e) { U.error(log, "Unexpected error during WAL compression", e); - FileWriteHandle handle = currentHandle(); - - if (handle != null) - handle.invalidateEnvironment(e); + NodeInvalidator.INSTANCE.invalidate(cctx.kernalContext(), e); } catch (InterruptedException ignore) { Thread.currentThread().interrupt(); @@ -2058,10 +2047,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl catch (IOException e) { U.error(log, "Unexpected error during WAL decompression", e); - FileWriteHandle handle = currentHandle(); - - if (handle != null) - handle.invalidateEnvironment(e); + NodeInvalidator.INSTANCE.invalidate(cctx.kernalContext(), e); } } } @@ -2339,17 +2325,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** Absolute WAL segment file index (incremental counter) */ protected final long idx; - /** */ - protected String gridName; - /** * @param fileIO I/O interface for read/write operations of FileHandle. * @param idx Absolute WAL segment file index (incremental counter). */ - private FileHandle(FileIO fileIO, long idx, String gridName) { + private FileHandle(FileIO fileIO, long idx) { this.fileIO = fileIO; this.idx = idx; - this.gridName = gridName; } } @@ -2378,11 +2360,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl ReadFileHandle( FileIO fileIO, long idx, - String gridName, RecordSerializer ser, FileInput in ) { - super(fileIO, idx, gridName); + super(fileIO, idx); this.ser = ser; this.in = in; @@ -2427,9 +2408,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** */ private final Lock lock = new ReentrantLock(); - /** Condition activated each time writeBuffer() completes. Used to wait previously flushed write to complete */ - private final Condition writeComplete = lock.newCondition(); - /** Condition for timed wait of several threads, see {@link DataStorageConfiguration#getWalFsyncDelayNanos()} */ private final Condition fsync = lock.newCondition(); @@ -2454,13 +2432,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private FileWriteHandle( FileIO fileIO, long idx, - String gridName, long pos, boolean resume, RecordSerializer serializer, SegmentedRingByteBuffer buf ) throws IOException { - super(fileIO, idx, gridName); + super(fileIO, idx); assert serializer != null; @@ -2499,7 +2476,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl assert rec.size() > 0 : rec; for (;;) { - checkEnvironment(); + checkNode(); SegmentedRingByteBuffer.WriteSegment seg; @@ -2818,7 +2795,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl lock.lock(); try { - assert envFailed != null || written == lastFsyncPos || mode != WALMode.DEFAULT : + assert cctx.kernalContext().invalidated() || written == lastFsyncPos || mode != WALMode.DEFAULT : "fsync [written=" + written + ", lastFsync=" + lastFsyncPos + ", idx=" + idx + ']'; fileIO = null; @@ -2846,40 +2823,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * @param e Exception to set as a cause for all further operations. - */ - private void invalidateEnvironment(Throwable e) { - lock.lock(); - - try { - invalidateEnvironmentLocked(e); - } - finally { - writeComplete.signalAll(); - - lock.unlock(); - } - } - - /** - * @param e Exception to set as a cause for all further operations. - */ - private void invalidateEnvironmentLocked(Throwable e) { - if (envFailed == null) { - envFailed = e; - - U.error(log, "IO error encountered while running WAL flush. All further operations " + - " will be failed and local node will be stopped.", e); - - new Thread() { - @Override public void run() { - IgnitionEx.stop(gridName, true, true); - } - }.start(); - } - } - - /** * @return Safely reads current position of the file channel as String. Will return "null" if channel is null. */ private String safePosition() { @@ -3311,7 +3254,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl Throwable err = walWriter.err; if (err != null) - currentHandle().invalidateEnvironment(err); + NodeInvalidator.INSTANCE.invalidate(cctx.kernalContext(), err); if (expPos == UNCONDITIONAL_FLUSH) expPos = (currentHandle().buf.tail()); @@ -3350,7 +3293,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl assert hdl.fileIO != null : "Writing to a closed segment."; - checkEnvironment(); + checkNode(); long lastLogged = U.currentTimeMillis(); @@ -3375,7 +3318,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl lastLogged = now; } - checkEnvironment(); + checkNode(); } // Do the write. @@ -3398,9 +3341,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl assert hdl.written == hdl.fileIO.position(); } catch (IOException e) { - hdl.invalidateEnvironmentLocked(e); + StorageException se = new StorageException("Unable to write", e); + + NodeInvalidator.INSTANCE.invalidate(cctx.kernalContext(), se); - throw new StorageException(e); + throw se; } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/14055683/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java index fa3f7f3..908d5b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java @@ -625,6 +625,16 @@ public class StandaloneGridKernalContext implements GridKernalContext { } /** {@inheritDoc} */ + @Override public boolean invalidated() { + return false; + } + + /** {@inheritDoc} */ + @Override public void invalidate() { + + } + + /** {@inheritDoc} */ @NotNull @Override public Iterator<GridComponent> iterator() { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/14055683/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java new file mode 100644 index 0000000..3511551 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db.file; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.file.OpenOption; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheRebalanceMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.GridKernalState; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Assert; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_MMAP; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR; + +/** + * Tests node recovering after disk errors during interaction with persistent storage. + */ +public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest { + /** */ + private static final int PAGE_SIZE = DataStorageConfiguration.DFLT_PAGE_SIZE; + + /** */ + private static final int WAL_SEGMENT_SIZE = 1024 * PAGE_SIZE; + + /** */ + private static final long DFLT_DISK_SPACE_BYTES = Long.MAX_VALUE; + + /** */ + private static final long STOP_TIMEOUT_MS = 30 * 1000; + + /** */ + private static final String CACHE_NAME = "cache"; + + /** */ + private boolean failPageStoreDiskOperations = false; + + /** */ + private long diskSpaceBytes = DFLT_DISK_SPACE_BYTES; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + stopAllGrids(); + + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false)); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false)); + + failPageStoreDiskOperations = false; + diskSpaceBytes = DFLT_DISK_SPACE_BYTES; + System.clearProperty(IGNITE_WAL_MMAP); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + DataStorageConfiguration dsCfg = new DataStorageConfiguration() + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration().setMaxSize(100 * 1024 * 1024).setPersistenceEnabled(true)) + .setWalMode(WALMode.LOG_ONLY) + .setWalCompactionEnabled(false) + .setWalSegmentSize(WAL_SEGMENT_SIZE) + .setConcurrencyLevel(Runtime.getRuntime().availableProcessors() * 4); + + if (failPageStoreDiskOperations) + dsCfg.setFileIOFactory(new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), diskSpaceBytes)); + + cfg.setDataStorageConfiguration(dsCfg); + + CacheConfiguration cacheCfg = new CacheConfiguration(CACHE_NAME) + .setRebalanceMode(CacheRebalanceMode.NONE) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) + .setAffinity(new RendezvousAffinityFunction(false, 1)); + + cfg.setCacheConfiguration(cacheCfg); + + return cfg; + } + + /** + * + */ + public void testRecoveringOnCacheInitError() throws Exception { + failPageStoreDiskOperations = true; + + // Two pages is enough to initialize MetaStorage. + diskSpaceBytes = 2 * PAGE_SIZE; + + final IgniteEx grid = startGrid(0); + + boolean failed = false; + try { + grid.active(true); + } catch (Exception expected) { + log.warning("Expected cache error", expected); + + failed = true; + } + + Assert.assertTrue("Cache initialization must failed", failed); + + // Grid should be automatically stopped after checkpoint fail. + awaitStop(grid); + + // Grid should be successfully recovered after stopping. + failPageStoreDiskOperations = false; + + IgniteEx recoveredGrid = startGrid(0); + recoveredGrid.active(true); + } + + /** + * + */ + public void testRecoveringOnCheckpointWritingError() throws Exception { + failPageStoreDiskOperations = true; + diskSpaceBytes = 1024 * PAGE_SIZE; + + final IgniteEx grid = startGrid(0); + grid.active(true); + + for (int i = 0; i < 1000; i++) { + byte payload = (byte) i; + byte[] data = new byte[2048]; + Arrays.fill(data, payload); + + grid.cache(CACHE_NAME).put(i, data); + } + + boolean checkpointFailed = false; + try { + forceCheckpoint(); + } + catch (IgniteCheckedException e) { + for (Throwable t : e.getSuppressed()) + if (t.getCause() != null && t.getCause().getMessage().equals("Not enough space!")) + checkpointFailed = true; + } + + Assert.assertTrue("Checkpoint must be failed by IOException (Not enough space!)", checkpointFailed); + + // Grid should be automatically stopped after checkpoint fail. + awaitStop(grid); + + // Grid should be successfully recovered after stopping. + failPageStoreDiskOperations = false; + + IgniteEx recoveredGrid = startGrid(0); + recoveredGrid.active(true); + + for (int i = 0; i < 1000; i++) { + byte payload = (byte) i; + byte[] data = new byte[2048]; + Arrays.fill(data, payload); + + byte[] actualData = (byte[]) recoveredGrid.cache(CACHE_NAME).get(i); + Assert.assertArrayEquals(data, actualData); + } + } + + /** + * + */ + public void testRecoveringOnWALErrorWithMmap() throws Exception { + diskSpaceBytes = WAL_SEGMENT_SIZE; + System.setProperty(IGNITE_WAL_MMAP, "true"); + emulateRecoveringOnWALWritingError(); + } + + /** + * + */ + public void testRecoveringOnWALErrorWithoutMmap() throws Exception { + diskSpaceBytes = 2 * WAL_SEGMENT_SIZE; + System.setProperty(IGNITE_WAL_MMAP, "false"); + emulateRecoveringOnWALWritingError(); + } + + /** + * + */ + private void emulateRecoveringOnWALWritingError() throws Exception { + final IgniteEx grid = startGrid(0); + + FileWriteAheadLogManager wal = (FileWriteAheadLogManager)grid.context().cache().context().wal(); + wal.setFileIOFactory(new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), diskSpaceBytes)); + + grid.active(true); + + int failedPosition = -1; + + for (int i = 0; i < 1000; i++) { + byte payload = (byte) i; + byte[] data = new byte[2048]; + Arrays.fill(data, payload); + + try { + grid.cache(CACHE_NAME).put(i, data); + } + catch (Exception e) { + failedPosition = i; + + break; + } + } + + // We must be able to put something into cache before fail. + Assert.assertTrue(failedPosition > 0); + + // Grid should be automatically stopped after WAL fail. + awaitStop(grid); + + // Grid should be successfully recovered after stopping. + IgniteEx recoveredGrid = startGrid(0); + recoveredGrid.active(true); + + for (int i = 0; i < failedPosition; i++) { + byte payload = (byte) i; + byte[] data = new byte[2048]; + Arrays.fill(data, payload); + + byte[] actualData = (byte[]) recoveredGrid.cache(CACHE_NAME).get(i); + Assert.assertArrayEquals(data, actualData); + } + } + + /** + * + */ + private void awaitStop(final IgniteEx grid) throws IgniteInterruptedCheckedException { + GridTestUtils.waitForCondition(() -> grid.context().gateway().getState() == GridKernalState.STOPPED, STOP_TIMEOUT_MS); + } + + /** + * + */ + private void forceCheckpoint() throws Exception { + for (Ignite ignite : G.allGrids()) { + if (ignite.cluster().localNode().isClient()) + continue; + + GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context() + .cache().context().database(); + + dbMgr.waitForCheckpoint("test"); + } + } + + /** + * + */ + private static class LimitedSizeFileIO extends FileIODecorator { + /** */ + private final AtomicLong availableSpaceBytes; + + /** + * @param delegate File I/O delegate. + * @param availableSpaceBytes Shared counter which indicates the number of available bytes in a FS. + */ + public LimitedSizeFileIO(FileIO delegate, AtomicLong availableSpaceBytes) { + super(delegate); + this.availableSpaceBytes = availableSpaceBytes; + } + + /** {@inheritDoc} */ + @Override public int write(ByteBuffer srcBuf) throws IOException { + int written = super.write(srcBuf); + availableSpaceBytes.addAndGet(-written); + if (availableSpaceBytes.get() < 0) + throw new IOException("Not enough space!"); + return written; + } + + /** {@inheritDoc} */ + @Override public int write(ByteBuffer srcBuf, long position) throws IOException { + int written = super.write(srcBuf, position); + availableSpaceBytes.addAndGet(-written); + if (availableSpaceBytes.get() < 0) + throw new IOException("Not enough space!"); + return written; + } + + /** {@inheritDoc} */ + @Override public void write(byte[] buf, int off, int len) throws IOException { + super.write(buf, off, len); + availableSpaceBytes.addAndGet(-len); + if (availableSpaceBytes.get() < 0) + throw new IOException("Not enough space!"); + } + + /** {@inheritDoc} */ + @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException { + availableSpaceBytes.addAndGet(-maxWalSegmentSize); + if (availableSpaceBytes.get() < 0) + throw new IOException("Not enough space!"); + return super.map(maxWalSegmentSize); + } + } + + private static class LimitedSizeFileIOFactory implements FileIOFactory { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** */ + private final FileIOFactory delegate; + + /** */ + private final AtomicLong availableSpaceBytes; + + /** + * @param delegate File I/O factory delegate. + * @param fsSpaceBytes Number of available bytes in FS. + */ + private LimitedSizeFileIOFactory(FileIOFactory delegate, long fsSpaceBytes) { + this.delegate = delegate; + this.availableSpaceBytes = new AtomicLong(fsSpaceBytes); + } + + /** {@inheritDoc} */ + @Override public FileIO create(File file) throws IOException { + return new LimitedSizeFileIO(delegate.create(file), availableSpaceBytes); + } + + /** {@inheritDoc} */ + @Override public FileIO create(File file, OpenOption... modes) throws IOException { + return new LimitedSizeFileIO(delegate.create(file, modes), availableSpaceBytes); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/14055683/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java index 6955e32..6cd3c1f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java @@ -21,17 +21,18 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataPageEvictionMode; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIOFactory; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jsr166.ThreadLocalRandom8; -import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR; - /** * Test what interruptions of writing threads do not affect PDS. */ @@ -40,12 +41,12 @@ public class IgnitePdsThreadInterruptionTest extends GridCommonAbstractTest { private static final int PAGE_SIZE = 1 << 12; // 4096 /** */ - public static final int THREADS_CNT = 10; + public static final int THREADS_CNT = 100; /** * Cache name. */ - private final String cacheName = "cache"; + private final String CACHE_NAME = "cache"; /** */ private volatile boolean stop = false; @@ -54,37 +55,49 @@ public class IgnitePdsThreadInterruptionTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { final IgniteConfiguration cfg = super.getConfiguration(gridName); - cfg.setDataStorageConfiguration(memoryConfiguration()); + cfg.setDataStorageConfiguration(storageConfiguration()); + + CacheConfiguration ccfg = new CacheConfiguration<>(CACHE_NAME); + + RendezvousAffinityFunction affinityFunction = new RendezvousAffinityFunction(); + affinityFunction.setPartitions(1); - cfg.setCacheConfiguration(new CacheConfiguration<>(cacheName)); + ccfg.setAffinity(affinityFunction); + + cfg.setCacheConfiguration(ccfg); return cfg; } /** - * @return Memory config. + * @return DataStorage configuration. */ - private DataStorageConfiguration memoryConfiguration() { - return new DataStorageConfiguration() - .setDefaultDataRegionConfiguration(new DataRegionConfiguration() - .setName("dfltMemPlc") - .setPersistenceEnabled(true) - ) - .setPageSize(PAGE_SIZE) - .setConcurrencyLevel(1) - .setWalMode(WALMode.LOG_ONLY) - .setWalFsyncDelayNanos(0); + private DataStorageConfiguration storageConfiguration() { + DataRegionConfiguration regionCfg = new DataRegionConfiguration() + .setInitialSize(10L * 1024L * 1024L) + .setMaxSize(10L * 1024L * 1024L) + .setPageEvictionMode(DataPageEvictionMode.RANDOM_LRU); + + DataStorageConfiguration cfg = new DataStorageConfiguration() + .setWalMode(WALMode.LOG_ONLY) + .setWalFsyncDelayNanos(0) + .setPageSize(PAGE_SIZE) + .setFileIOFactory(new AsyncFileIOFactory()); + + cfg.setDefaultDataRegionConfiguration(regionCfg); + + return cfg; } /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { + @Override protected void beforeTest() throws Exception { super.beforeTestsStarted(); deleteWorkFiles(); } /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { + @Override protected void afterTest() throws Exception { super.afterTestsStopped(); stopAllGrids(); @@ -93,10 +106,92 @@ public class IgnitePdsThreadInterruptionTest extends GridCommonAbstractTest { } /** - * Tests interruptions on WAL write. + * Tests interruptions on LFS read. * * @throws Exception If failed. */ + public void testInterruptsOnLFSRead() throws Exception { + final Ignite ignite = startGrid(); + + ignite.active(true); + + final int valLen = 8192; + + final byte[] payload = new byte[valLen]; + + final int maxKey = 10_000; + + Thread[] workers = new Thread[THREADS_CNT]; + + + final IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME); + for (int i=0; i < maxKey; i++) { + cache.put(i, payload); + } + + final AtomicReference<Throwable> fail = new AtomicReference<>(); + + + Runnable clo = new Runnable() { + @Override + public void run() { + cache.get(ThreadLocalRandom8.current().nextInt(maxKey / 5)); + } + }; + for (int i = 0; i < workers.length; i++) { + workers[i] = new Thread(clo); + workers[i].setName("reader-" + i); + workers[i].setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + @Override public void uncaughtException(Thread t, Throwable e) { + fail.compareAndSet(null, e); + + } + }); + } + + for (Thread worker : workers) + worker.start(); + + //Thread.sleep(3_000); + + // Interrupts should not affect reads. + for (int i = 0;i < workers.length / 2; i++) + workers[i].interrupt(); + + Thread.sleep(3_000); + + stop = true; + + for (Thread worker : workers) + worker.join(); + + Throwable t = fail.get(); + + assert t == null : t; + + + + int verifiedKeys = 0; + + // Post check. + for (int i = 0; i < maxKey; i++) { + byte[] val = (byte[]) cache.get(i); + + if (val != null) { + assertEquals("Illegal length", valLen, val.length); + + verifiedKeys++; + } + } + + log.info("Verified keys: " + verifiedKeys); + } + + /** + * Tests interruptions on WAL write. + * + * @throws Exception + */ public void testInterruptsOnWALWrite() throws Exception { final Ignite ignite = startGrid(); @@ -114,7 +209,7 @@ public class IgnitePdsThreadInterruptionTest extends GridCommonAbstractTest { Runnable clo = new Runnable() { @Override public void run() { - IgniteCache<Object, Object> cache = ignite.cache(cacheName); + IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME); while (!stop) cache.put(ThreadLocalRandom8.current().nextInt(maxKey), payload); @@ -126,8 +221,6 @@ public class IgnitePdsThreadInterruptionTest extends GridCommonAbstractTest { workers[i].setName("writer-" + i); workers[i].setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { - log.error("Worker thread error", e); - fail.compareAndSet(null, e); } }); @@ -153,7 +246,7 @@ public class IgnitePdsThreadInterruptionTest extends GridCommonAbstractTest { assert t == null : t; - IgniteCache<Object, Object> cache = ignite.cache(cacheName); + IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME); int verifiedKeys = 0; @@ -175,6 +268,6 @@ public class IgnitePdsThreadInterruptionTest extends GridCommonAbstractTest { * @throws IgniteCheckedException If fail. */ private void deleteWorkFiles() throws IgniteCheckedException { - deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false)); + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/14055683/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java index 8068b08..df649fa 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java @@ -30,8 +30,8 @@ import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; -import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFoldersResolver; import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings; +import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFoldersResolver; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; import org.jetbrains.annotations.Nullable; import org.mockito.Mockito; http://git-wip-us.apache.org/repos/asf/ignite/blob/14055683/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java index 8308fd3..9f86e0d 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePersistentS import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsMultiNodePutGetRestartTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPageEvictionTest; import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsCacheIntegrationTest; +import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsDiskErrorsRecoveringTest; import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsNoActualWalHistoryTest; import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsThreadInterruptionTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoveryPPCTest; @@ -57,6 +58,7 @@ public class IgnitePdsWithIndexingCoreTestSuite extends TestSuite { suite.addTestSuite(IgnitePdsTxCacheRebalancingTest.class); suite.addTestSuite(IgniteWalRecoveryPPCTest.class); + suite.addTestSuite(IgnitePdsDiskErrorsRecoveringTest.class); suite.addTestSuite(IgnitePdsBinaryMetadataOnClusterRestartTest.class); suite.addTestSuite(IgnitePdsMarshallerMappingRestoreOnNodeStartTest.class);
