Repository: ignite Updated Branches: refs/heads/master 5feb39c62 -> 8cb35e112
IGNITE-8320 Partition file can be truncated only after checkpoint - Fixes #3985. Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8cb35e11 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8cb35e11 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8cb35e11 Branch: refs/heads/master Commit: 8cb35e112f441b3754d651d83562278ba47f73ef Parents: 5feb39c Author: Pavel Kovalenko <[email protected]> Authored: Thu May 17 12:19:36 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Thu May 17 12:26:06 2018 +0300 ---------------------------------------------------------------------- .../distributed/dht/GridDhtLocalPartition.java | 55 ++- .../GridCacheDatabaseSharedManager.java | 383 ++++++++++++++-- .../persistence/GridCacheOffheapManager.java | 44 +- .../cache/persistence/file/FilePageStore.java | 43 +- .../wal/serializer/RecordDataV2Serializer.java | 2 +- .../IgnitePdsCorruptedIndexTest.java | 341 ++++++++++++++ .../IgnitePdsPartitionFilesDestroyTest.java | 444 +++++++++++++++++++ .../IgnitePdsPartitionFilesTruncateTest.java | 153 ------- .../testframework/junits/GridAbstractTest.java | 9 +- .../junits/multijvm/IgniteProcessProxy.java | 23 +- .../ignite/testsuites/IgnitePdsTestSuite.java | 2 + .../ignite/testsuites/IgnitePdsTestSuite2.java | 8 +- .../query/h2/database/InlineIndexHelper.java | 1 - .../IgnitePdsWithIndexingCoreTestSuite.java | 3 + 14 files changed, 1274 insertions(+), 237 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8cb35e11/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index a199f6c..c54a6cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -30,6 +30,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; @@ -88,6 +89,12 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements /** Maximum size for delete queue. */ public static final int MAX_DELETE_QUEUE_SIZE = Integer.getInteger(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, 200_000); + /** ONLY FOR TEST PURPOSES: force test checkpoint on partition eviction. */ + private static boolean forceTestCheckpointOnEviction = IgniteSystemProperties.getBoolean("TEST_CHECKPOINT_ON_EVICTION", false); + + /** ONLY FOR TEST PURPOSES: partition id where test checkpoint was enforced during eviction. */ + static volatile Integer partWhereTestCheckpointEnforced; + /** Maximum size for {@link #rmvQueue}. */ private final int rmvQueueMaxSize; @@ -209,6 +216,10 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements try { store = grp.offheap().createCacheDataStore(id); + // Log partition creation for further crash recovery purposes. + if (grp.walEnabled()) + ctx.wal().log(new PartitionMetaStateRecord(grp.groupId(), id, state(), updateCounter())); + // Inject row cache cleaner on store creation // Used in case the cache with enabled SqlOnheapCache is single cache at the cache group if (ctx.kernalContext().query().moduleEnabled()) { @@ -1037,6 +1048,16 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements ctx.database().checkpointReadUnlock(); } } + + if (forceTestCheckpointOnEviction) { + if (partWhereTestCheckpointEnforced == null && cleared >= fullSize()) { + ctx.database().forceCheckpoint("test").finishFuture().get(); + + log.warning("Forced checkpoint by test reasons for partition: " + this); + + partWhereTestCheckpointEnforced = id; + } + } } catch (NodeStoppingException e) { if (log.isDebugEnabled()) @@ -1331,10 +1352,10 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements */ class ClearFuture extends GridFutureAdapter<Boolean> { /** Flag indicates that eviction callback was registered on the current future. */ - private volatile boolean evictionCallbackRegistered; + private volatile boolean evictionCbRegistered; /** Flag indicates that clearing callback was registered on the current future. */ - private volatile boolean clearingCallbackRegistered; + private volatile boolean clearingCbRegistered; /** Flag indicates that future with all callbacks was finished. */ private volatile boolean finished; @@ -1353,25 +1374,29 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * @param updateSeq If {@code true} update topology sequence after successful eviction. */ private void registerEvictionCallback(boolean updateSeq) { - if (evictionCallbackRegistered) + if (evictionCbRegistered) return; synchronized (this) { // Double check - if (evictionCallbackRegistered) + if (evictionCbRegistered) return; - evictionCallbackRegistered = true; + evictionCbRegistered = true; // Initiates partition eviction and destroy. listen(f -> { - if (f.error() != null) { - rent.onDone(f.error()); - } else if (f.isDone()) { + try { + // Check for errors. + f.get(); + finishEviction(updateSeq); } + catch (Exception e) { + rent.onDone(e); + } - evictionCallbackRegistered = false; + evictionCbRegistered = false; }); } } @@ -1380,21 +1405,21 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * Registers clearing callback on the future. */ private void registerClearingCallback() { - if (clearingCallbackRegistered) + if (clearingCbRegistered) return; synchronized (this) { // Double check - if (clearingCallbackRegistered) + if (clearingCbRegistered) return; - clearingCallbackRegistered = true; + clearingCbRegistered = true; // Recreate cache data store in case of allowed fast eviction, and reset clear flag. listen(f -> { clear = false; - clearingCallbackRegistered = false; + clearingCbRegistered = false; }); } } @@ -1447,8 +1472,8 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements reset(); finished = false; - evictionCallbackRegistered = false; - clearingCallbackRegistered = false; + evictionCbRegistered = false; + clearingCbRegistered = false; } if (evictionRequested) http://git-wip-us.apache.org/repos/asf/ignite/blob/8cb35e11/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 4b183f7..7151d75 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -21,7 +21,6 @@ import java.io.File; import java.io.FileFilter; import java.io.IOException; import java.io.RandomAccessFile; -import java.io.Serializable; import java.lang.ref.SoftReference; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -86,7 +85,6 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.mem.DirectMemoryProvider; -import org.apache.ignite.internal.mem.DirectMemoryRegion; import org.apache.ignite.internal.mem.file.MappedFileMemoryProvider; import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider; import org.apache.ignite.internal.pagemem.FullPageId; @@ -302,6 +300,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } }; + /** Timeout between partition file destroy and checkpoint to handle it. */ + private static final long PARTITION_DESTROY_CHECKPOINT_TIMEOUT = 30 * 1000; // 30 Seconds. + /** Checkpoint thread. Needs to be volatile because it is created in exchange worker. */ private volatile Checkpointer checkpointer; @@ -395,6 +396,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** Initially disabled cache groups. */ private Collection<Integer> initiallyGlobalWalDisabledGrps = new HashSet<>(); + /** Initially local wal disabled groups. */ private Collection<Integer> initiallyLocalWalDisabledGrps = new HashSet<>(); /** File I/O factory for writing checkpoint markers. */ @@ -509,6 +511,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan final GridKernalContext kernalCtx = cctx.kernalContext(); if (!kernalCtx.clientNode()) { + checkpointer = new Checkpointer(cctx.igniteInstanceName(), "db-checkpoint-thread", log); + IgnitePageStoreManager store = cctx.pageStore(); assert store instanceof FilePageStoreManager : "Invalid page store manager was created: " + store; @@ -665,6 +669,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan registrateMetricsMBean(); } + if (checkpointer == null) + checkpointer = new Checkpointer(cctx.igniteInstanceName(), "db-checkpoint-thread", log); + super.onActivate(ctx); } @@ -1480,8 +1487,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan snapshotMgr.restoreState(); - checkpointer = new Checkpointer(cctx.igniteInstanceName(), "db-checkpoint-thread", log); - new IgniteThread(cctx.igniteInstanceName(), "db-checkpoint-thread", checkpointer).start(); CheckpointProgressSnapshot chp = checkpointer.wakeupForCheckpoint(0, "node started"); @@ -1959,20 +1964,23 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan * @throws IgniteCheckedException If failed. * @throws StorageException In case I/O error occurred during operations with storage. */ - private @Nullable WALPointer restoreMemory(CheckpointStatus status) throws IgniteCheckedException { + @Nullable private WALPointer restoreMemory(CheckpointStatus status) throws IgniteCheckedException { return restoreMemory(status, false, (PageMemoryEx)metaStorage.pageMemory()); } /** * @param status Checkpoint status. - * @param storeOnly If {@code True} restores Metastorage only. + * @param metastoreOnly If {@code True} restores Metastorage only. * @param storePageMem Metastore page memory. * @throws IgniteCheckedException If failed. * @throws StorageException In case I/O error occurred during operations with storage. */ - private @Nullable WALPointer restoreMemory(CheckpointStatus status, boolean storeOnly, - PageMemoryEx storePageMem) throws IgniteCheckedException { - assert !storeOnly || storePageMem != null; + @Nullable private WALPointer restoreMemory( + CheckpointStatus status, + boolean metastoreOnly, + PageMemoryEx storePageMem + ) throws IgniteCheckedException { + assert !metastoreOnly || storePageMem != null; if (log.isInfoEnabled()) log.info("Checking memory state [lastValidPos=" + status.endPtr + ", lastMarked=" @@ -1993,7 +2001,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan int applied = 0; WALPointer lastRead = null; - Collection<Integer> ignoreGrps = storeOnly ? Collections.emptySet() : + Collection<Integer> ignoreGrps = metastoreOnly ? Collections.emptySet() : F.concat(false, initiallyGlobalWalDisabledGrps, initiallyLocalWalDisabledGrps); try (WALIterator it = cctx.wal().replay(status.endPtr)) { @@ -2029,7 +2037,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan // several repetitive restarts and the same pages may have changed several times. int grpId = pageRec.fullPageId().groupId(); - if (storeOnly && grpId != METASTORAGE_CACHE_ID) + if (metastoreOnly && grpId != METASTORAGE_CACHE_ID) continue; if (!ignoreGrps.contains(grpId)) { @@ -2059,22 +2067,47 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan break; + case PART_META_UPDATE_STATE: + PartitionMetaStateRecord metaStateRecord = (PartitionMetaStateRecord)rec; + + { + int grpId = metaStateRecord.groupId(); + + if (metastoreOnly && grpId != METASTORAGE_CACHE_ID) + continue; + + if (ignoreGrps.contains(grpId)) + continue; + + int partId = metaStateRecord.partitionId(); + + GridDhtPartitionState state = GridDhtPartitionState.fromOrdinal(metaStateRecord.state()); + + if (state == null || state == GridDhtPartitionState.EVICTED) + schedulePartitionDestroy(grpId, partId); + else + cancelOrWaitPartitionDestroy(grpId, partId); + } + + break; + case PARTITION_DESTROY: - PartitionDestroyRecord destroyRec = (PartitionDestroyRecord)rec; + PartitionDestroyRecord destroyRecord = (PartitionDestroyRecord)rec; - final int gId = destroyRec.groupId(); + { + int grpId = destroyRecord.groupId(); - if (storeOnly && gId != METASTORAGE_CACHE_ID) - continue; + if (metastoreOnly && grpId != METASTORAGE_CACHE_ID) + continue; + + if (ignoreGrps.contains(grpId)) + continue; - if (!ignoreGrps.contains(gId)) { - final int pId = destroyRec.partitionId(); + PageMemoryEx pageMem = grpId == METASTORAGE_CACHE_ID ? storePageMem : getPageMemoryForCacheGroup(grpId); - PageMemoryEx pageMem = gId == METASTORAGE_CACHE_ID ? storePageMem : getPageMemoryForCacheGroup(gId); + pageMem.invalidate(grpId, destroyRecord.partitionId()); - pageMem.clearAsync( - (grpId, pageId) -> grpId == gId && PageIdUtils.partId(pageId) == pId, - true).get(); + schedulePartitionDestroy(grpId, destroyRecord.partitionId()); } break; @@ -2085,7 +2118,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan int grpId = r.groupId(); - if (storeOnly && grpId != METASTORAGE_CACHE_ID) + if (metastoreOnly && grpId != METASTORAGE_CACHE_ID) continue; if (!ignoreGrps.contains(grpId)) { @@ -2118,7 +2151,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } } - if (storeOnly) + if (metastoreOnly) return null; if (status.needRestoreMemory()) { @@ -2740,6 +2773,170 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** + * Adds given partition to checkpointer destroy queue. + * + * @param grpId Group ID. + * @param partId Partition ID. + */ + public void schedulePartitionDestroy(int grpId, int partId) { + Checkpointer cp = checkpointer; + + if (cp != null) + cp.schedulePartitionDestroy(cctx.cache().cacheGroup(grpId), grpId, partId); + } + + /** + * Cancels or wait for partition destroy. + * + * @param grpId Group ID. + * @param partId Partition ID. + * @throws IgniteCheckedException If failed. + */ + public void cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException { + Checkpointer cp = checkpointer; + + if (cp != null) + cp.cancelOrWaitPartitionDestroy(grpId, partId); + } + + /** + * Partition destroy queue. + */ + private static class PartitionDestroyQueue { + /** */ + private final ConcurrentMap<T2<Integer, Integer>, PartitionDestroyRequest> pendingReqs = + new ConcurrentHashMap<>(); + + /** + * @param grpCtx Group context. + * @param partId Partition ID to destroy. + */ + private void addDestroyRequest(@Nullable CacheGroupContext grpCtx, int grpId, int partId) { + PartitionDestroyRequest req = new PartitionDestroyRequest(grpId, partId); + + PartitionDestroyRequest old = pendingReqs.putIfAbsent(new T2<>(grpId, partId), req); + + assert old == null || grpCtx == null : "Must wait for old destroy request to finish before adding a new one " + + "[grpId=" + grpId + + ", grpName=" + grpCtx.cacheOrGroupName() + + ", partId=" + partId + ']'; + } + + /** + * @param destroyId Destroy ID. + * @return Destroy request to complete if was not concurrently cancelled. + */ + private PartitionDestroyRequest beginDestroy(T2<Integer, Integer> destroyId) { + PartitionDestroyRequest rmvd = pendingReqs.remove(destroyId); + + return rmvd == null ? null : rmvd.beginDestroy() ? rmvd : null; + } + + /** + * @param grpId Group ID. + * @param partId Partition ID. + * @return Destroy request to wait for if destroy has begun. + */ + private PartitionDestroyRequest cancelDestroy(int grpId, int partId) { + PartitionDestroyRequest rmvd = pendingReqs.remove(new T2<>(grpId, partId)); + + return rmvd == null ? null : !rmvd.cancel() ? rmvd : null; + } + } + + /** + * Partition destroy request. + */ + private static class PartitionDestroyRequest { + /** */ + private final int grpId; + + /** */ + private final int partId; + + /** Destroy cancelled flag. */ + private boolean cancelled; + + /** Destroy future. Not null if partition destroy has begun. */ + private GridFutureAdapter<Void> destroyFut; + + /** + * @param grpId Group ID. + * @param partId Partition ID. + */ + private PartitionDestroyRequest(int grpId, int partId) { + this.grpId = grpId; + this.partId = partId; + } + + /** + * Cancels partition destroy request. + * + * @return {@code False} if this request needs to be waited for. + */ + private synchronized boolean cancel() { + if (destroyFut != null) { + assert !cancelled; + + return false; + } + + cancelled = true; + + return true; + } + + /** + * Initiates partition destroy. + * + * @return {@code True} if destroy request should be executed, {@code false} otherwise. + */ + private synchronized boolean beginDestroy() { + if (cancelled) { + assert destroyFut == null; + + return false; + } + + if (destroyFut != null) + return false; + + destroyFut = new GridFutureAdapter<>(); + + return true; + } + + /** + * + */ + private synchronized void onDone(Throwable err) { + assert destroyFut != null; + + destroyFut.onDone(err); + } + + /** + * + */ + private void waitCompleted() throws IgniteCheckedException { + GridFutureAdapter<Void> fut; + + synchronized (this) { + assert destroyFut != null; + + fut = destroyFut; + } + + fut.get(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "PartitionDestroyRequest [grpId=" + grpId + ", partId=" + partId + ']'; + } + } + + /** * Checkpointer object is used for notification on checkpoint begin, predicate is {@link #scheduledCp}<code>.nextCpTs - now * > 0 </code>. Method {@link #wakeupForCheckpoint} uses notify, {@link #waitCheckpointEvent} uses wait */ @@ -2752,7 +2949,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan private volatile CheckpointProgress scheduledCp; /** Current checkpoint. This field is updated only by checkpoint thread. */ - private volatile CheckpointProgress curCpProgress; + @Nullable private volatile CheckpointProgress curCpProgress; /** Shutdown now. */ private volatile boolean shutdownNow; @@ -2911,7 +3108,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan syncedPagesCntr = new AtomicInteger(); evictedPagesCntr = new AtomicInteger(); - boolean interrupted = true; + boolean success = false; try { if (chp.hasDelta()) { @@ -2998,11 +3195,22 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan snapshotMgr.afterCheckpointPageWritten(); + try { + destroyEvictedPartitions(); + } + catch (IgniteCheckedException e) { + chp.progress.cpFinishFut.onDone(e); + + cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); + + return; + } + // Must mark successful checkpoint only if there are no exceptions or interrupts. - interrupted = false; + success = true; } finally { - if (!interrupted) + if (success) markCheckpointEnd(chp); } @@ -3053,6 +3261,122 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** + * Processes all evicted partitions scheduled for destroy. + * + * @throws IgniteCheckedException If failed. + */ + private void destroyEvictedPartitions() throws IgniteCheckedException { + PartitionDestroyQueue destroyQueue = curCpProgress.destroyQueue; + + if (destroyQueue.pendingReqs.isEmpty()) + return; + + List<PartitionDestroyRequest> reqs = null; + + for (final PartitionDestroyRequest req : destroyQueue.pendingReqs.values()) { + if (!req.beginDestroy()) + continue; + + final int grpId = req.grpId; + final int partId = req.partId; + + CacheGroupContext grp = cctx.cache().cacheGroup(grpId); + + assert grp != null + : "Cache group is not initialized [grpId=" + grpId + "]"; + assert grp.offheap() instanceof GridCacheOffheapManager + : "Destroying partition files when persistence is off " + grp.offheap(); + + final GridCacheOffheapManager offheap = (GridCacheOffheapManager) grp.offheap(); + + Runnable destroyPartTask = () -> { + try { + offheap.destroyPartitionStore(grpId, partId); + + req.onDone(null); + + if (log.isDebugEnabled()) + log.debug("Partition file has destroyed [grpId=" + grpId + ", partId=" + partId + "]"); + } + catch (Exception e) { + req.onDone(new IgniteCheckedException( + "Partition file destroy has failed [grpId=" + grpId + ", partId=" + partId + "]", e)); + } + }; + + if (asyncRunner != null) { + try { + asyncRunner.execute(destroyPartTask); + } + catch (RejectedExecutionException ignore) { + // Run the task synchronously. + destroyPartTask.run(); + } + } + else + destroyPartTask.run(); + + if (reqs == null) + reqs = new ArrayList<>(); + + reqs.add(req); + } + + if (reqs != null) + for (PartitionDestroyRequest req : reqs) + req.waitCompleted(); + + destroyQueue.pendingReqs.clear(); + } + + /** + * @param grpCtx Group context. Can be {@code null} in case of crash recovery. + * @param grpId Group ID. + * @param partId Partition ID. + */ + private void schedulePartitionDestroy(@Nullable CacheGroupContext grpCtx, int grpId, int partId) { + synchronized (this) { + scheduledCp.destroyQueue.addDestroyRequest(grpCtx, grpId, partId); + } + + if (log.isDebugEnabled()) + log.debug("Partition file has been scheduled to destroy [grpId=" + grpId + ", partId=" + partId + "]"); + + if (grpCtx != null) + wakeupForCheckpoint(PARTITION_DESTROY_CHECKPOINT_TIMEOUT, "partition destroy"); + } + + /** + * @param grpId Group ID. + * @param partId Partition ID. + */ + private void cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException { + PartitionDestroyRequest req; + + synchronized (this) { + req = scheduledCp.destroyQueue.cancelDestroy(grpId, partId); + } + + if (req != null) + req.waitCompleted(); + + CheckpointProgress cur; + + synchronized (this) { + cur = curCpProgress; + + if (cur != null) + req = cur.destroyQueue.cancelDestroy(grpId, partId); + } + + if (req != null) + req.waitCompleted(); + + if (log.isDebugEnabled()) + log.debug("Partition file destroy has cancelled [grpId=" + grpId + ", partId=" + partId + "]"); + } + + /** * */ @SuppressWarnings("WaitNotInLoop") @@ -3197,7 +3521,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } } - if (hasPages) { + if (hasPages || !curr.destroyQueue.pendingReqs.isEmpty()) { assert cpPtr != null; tracker.onWalCpRecordFsyncStart(); @@ -3672,6 +3996,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** */ private volatile SnapshotOperation snapshotOperation; + /** Partitions destroy queue. */ + private final PartitionDestroyQueue destroyQueue = new PartitionDestroyQueue(); + /** Wakeup reason. */ private String reason; http://git-wip-us.apache.org/repos/asf/ignite/blob/8cb35e11/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index d7cc623..03e892e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -146,8 +146,10 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ - @Override protected CacheDataStore createCacheDataStore0(final int p) - throws IgniteCheckedException { + @Override protected CacheDataStore createCacheDataStore0(int p) throws IgniteCheckedException { + if (ctx.database() instanceof GridCacheDatabaseSharedManager) + ((GridCacheDatabaseSharedManager) ctx.database()).cancelOrWaitPartitionDestroy(grp.groupId(), p); + boolean exists = ctx.pageStore() != null && ctx.pageStore().exists(grp.groupId(), p); return new GridCacheDataStore(p, exists); @@ -573,25 +575,41 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple /** {@inheritDoc} */ @Override protected void destroyCacheDataStore0(CacheDataStore store) throws IgniteCheckedException { + assert ctx.database() instanceof GridCacheDatabaseSharedManager + : "Destroying cache data store when persistence is not enabled: " + ctx.database(); + + int partId = store.partId(); + ctx.database().checkpointReadLock(); try { - int p = store.partId(); - saveStoreMetadata(store, null, false, true); - - PageMemoryEx pageMemory = (PageMemoryEx)grp.dataRegion().pageMemory(); - - int tag = pageMemory.invalidate(grp.groupId(), p); - - if (grp.walEnabled()) - ctx.wal().log(new PartitionDestroyRecord(grp.groupId(), p)); - - ctx.pageStore().onPartitionDestroyed(grp.groupId(), p, tag); } finally { ctx.database().checkpointReadUnlock(); } + + ((GridCacheDatabaseSharedManager)ctx.database()).schedulePartitionDestroy(grp.groupId(), partId); + } + + /** + * Invalidates page memory for given partition. Destroys partition store. + * <b>NOTE:</b> This method can be invoked only within checkpoint lock or checkpointer thread. + * + * @param grpId Group ID. + * @param partId Partition ID. + * + * @throws IgniteCheckedException If destroy has failed. + */ + public void destroyPartitionStore(int grpId, int partId) throws IgniteCheckedException { + PageMemoryEx pageMemory = (PageMemoryEx)grp.dataRegion().pageMemory(); + + int tag = pageMemory.invalidate(grp.groupId(), partId); + + if (grp.walEnabled()) + ctx.wal().log(new PartitionDestroyRecord(grp.groupId(), partId)); + + ctx.pageStore().onPartitionDestroyed(grpId, partId, tag); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/8cb35e11/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java index 05f9421..ae4880d 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.nio.file.Files; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -244,7 +245,7 @@ public class FilePageStore implements PageStore { fileIO.close(); if (cleanFile) - cfgFile.delete(); + Files.delete(cfgFile.toPath()); } catch (IOException e) { throw new PersistentStorageIOException(e); @@ -255,35 +256,34 @@ public class FilePageStore implements PageStore { } /** + * Truncates and deletes partition file. * + * @param tag New partition tag. + * @throws PersistentStorageIOException If failed */ public void truncate(int tag) throws PersistentStorageIOException { - lock.writeLock().lock(); + init(); - long pages = this.pages(); + lock.writeLock().lock(); try { - if (!inited) - return; - this.tag = tag; - try { - fileIO.close(); - } - finally { - cfgFile.delete(); - } + fileIO.clear(); + + fileIO.close(); + + Files.delete(cfgFile.toPath()); } catch (IOException e) { - throw new PersistentStorageIOException(e); + throw new PersistentStorageIOException("Failed to delete partition file: " + cfgFile.getPath(), e); } finally { - inited = false; - allocated.set(0); - allocatedTracker.updateTotalAllocatedPages(-1L * pages); + allocatedTracker.updateTotalAllocatedPages(-1L * this.pages()); + + inited = false; lock.writeLock().unlock(); } @@ -325,7 +325,7 @@ public class FilePageStore implements PageStore { recover = false; } catch (IOException e) { - throw new PersistentStorageIOException("Unable to finish recover", e); + throw new PersistentStorageIOException("Failed to finish recover", e); } finally { lock.writeLock().unlock(); @@ -421,9 +421,9 @@ public class FilePageStore implements PageStore { } /** - * @throws IgniteCheckedException If failed to initialize store file. + * @throws PersistentStorageIOException If failed to initialize store file. */ - private void init() throws IgniteCheckedException { + private void init() throws PersistentStorageIOException { if (!inited) { lock.writeLock().lock(); @@ -431,7 +431,7 @@ public class FilePageStore implements PageStore { if (!inited) { FileIO fileIO = null; - IgniteCheckedException err = null; + PersistentStorageIOException err = null; try { this.fileIO = fileIO = ioFactory.create(cfgFile, CREATE, READ, WRITE); @@ -451,7 +451,8 @@ public class FilePageStore implements PageStore { inited = true; } catch (IOException e) { - err = new PersistentStorageIOException("Could not initialize file: " + cfgFile.getName(), e); + err = new PersistentStorageIOException( + "Failed to initialize partition file: " + cfgFile.getName(), e); throw err; } http://git-wip-us.apache.org/repos/asf/ignite/blob/8cb35e11/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java index efba611..b3a00be 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java @@ -92,7 +92,7 @@ public class RecordDataV2Serializer implements RecordDataSerializer { return 8 + 1; case EXCHANGE: - return 4 /*type*/ + 8 /*timestamp*/ + 2 /*constId*/; + return 4 /*type*/ + 8 /*timestamp*/ + 2 /*constId*/; case TX_RECORD: return txRecordSerializer.size((TxRecord)rec); http://git-wip-us.apache.org/repos/asf/ignite/blob/8cb35e11/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedIndexTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedIndexTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedIndexTest.java new file mode 100644 index 0000000..a1065f6 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedIndexTest.java @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.persistence; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.file.OpenOption; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.Cache; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.testframework.junits.multijvm.IgniteProcessProxy; +import org.junit.Assert; + +/** + * Test to reproduce corrupted indexes problem after partition file eviction and truncation. + */ +public class IgnitePdsCorruptedIndexTest extends GridCommonAbstractTest { + /** Cache name. */ + private static final String CACHE = "cache"; + + /** Flag indicates that {@link HaltOnTruncateFileIO} should be used. */ + private boolean haltFileIO; + + /** MultiJVM flag. */ + private boolean multiJvm = true; + + /** Additional remote JVM args. */ + private List<String> additionalArgs = Collections.emptyList(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setConsistentId(igniteInstanceName); + + DataStorageConfiguration dsCfg = new DataStorageConfiguration() + .setWalMode(WALMode.LOG_ONLY) + .setCheckpointFrequency(10 * 60 * 1000) + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setMaxSize(512 * 1024 * 1024) + .setPersistenceEnabled(true) + ); + + if (haltFileIO) + dsCfg.setFileIOFactory(new HaltOnTruncateFileIOFactory(new RandomAccessFileIOFactory())); + + cfg.setDataStorageConfiguration(dsCfg); + + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(CACHE) + .setBackups(1) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) + .setIndexedTypes(Integer.class, IndexedObject.class, Long.class, IndexedObject.class) + .setAffinity(new RendezvousAffinityFunction(false, 32)); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected boolean isMultiJvm() { + return multiJvm; + } + + /** {@inheritDoc} */ + @Override protected List<String> additionalRemoteJvmArgs() { + return additionalArgs; + } + + /** + * + */ + public void testCorruption() throws Exception { + final String corruptedNodeName = "corrupted"; + + IgniteEx ignite = startGrid(0); + + haltFileIO = true; + + additionalArgs = new ArrayList<>(); + additionalArgs.add("-D" + "TEST_CHECKPOINT_ON_EVICTION=true"); + additionalArgs.add("-D" + "IGNITE_QUIET=false"); + + IgniteEx corruptedNode = (IgniteEx) startGrid(corruptedNodeName); + + additionalArgs.clear(); + + haltFileIO = false; + + startGrid(2); + + ignite.cluster().active(true); + + awaitPartitionMapExchange(); + + final int entityCnt = 3_200; + + try (IgniteDataStreamer<Object, Object> streamer = ignite.dataStreamer(CACHE)) { + streamer.allowOverwrite(true); + + for (int i = 0; i < entityCnt; i++) + streamer.addData(i, new IndexedObject(i)); + } + + startGrid(3); + + resetBaselineTopology(); + + // Corrupted node should be halted during partition destroy. + GridTestUtils.waitForCondition(() -> ignite.cluster().nodes().size() == 3, getTestTimeout()); + + // Clear remote JVM instance cache. + IgniteProcessProxy.kill(corruptedNode.name()); + + stopAllGrids(); + + // Disable multi-JVM mode and start coordinator and corrupted node in the same JVM. + multiJvm = false; + + startGrid(0); + + corruptedNode = (IgniteEx) startGrid(corruptedNodeName); + + corruptedNode.cluster().active(true); + + resetBaselineTopology(); + + // If index was corrupted, rebalance or one of the following queries should be failed. + awaitPartitionMapExchange(); + + for (int k = 0; k < entityCnt; k += entityCnt / 4) { + IgniteCache<Integer, IndexedObject> cache1 = corruptedNode.cache(CACHE); + + int l = k; + int r = k + entityCnt / 4 - 1; + + log.info("Check range [" + l + "-" + r + "]"); + + QueryCursor<Cache.Entry<Long, IndexedObject>> qry = + cache1.query(new SqlQuery(IndexedObject.class, "lVal between ? and ?") + .setArgs(l * l, r * r)); + + Collection<Cache.Entry<Long, IndexedObject>> queried = qry.getAll(); + + log.info("Qry result size = " + queried.size()); + } + } + + /** + * + */ + private static class IndexedObject { + /** Integer indexed value. */ + @QuerySqlField(index = true) + private int iVal; + + /** Long indexed value. */ + @QuerySqlField(index = true) + private long lVal; + + /** */ + private byte[] payload = new byte[1024]; + + /** + * @param iVal Integer value. + */ + private IndexedObject(int iVal) { + this.iVal = iVal; + this.lVal = (long) iVal * iVal; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (!(o instanceof IndexedObject)) + return false; + + IndexedObject that = (IndexedObject)o; + + return iVal == that.iVal; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return iVal; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IndexedObject.class, this); + } + } + + /** + * File I/O which halts JVM after specified file truncation. + */ + private static class HaltOnTruncateFileIO extends FileIODecorator { + /** File. */ + private final File file; + + /** The overall number of file truncations have done. */ + private static final AtomicInteger truncations = new AtomicInteger(); + + /** + * @param delegate File I/O delegate + */ + public HaltOnTruncateFileIO(FileIO delegate, File file) { + super(delegate); + this.file = file; + } + + /** {@inheritDoc} */ + @Override public void clear() throws IOException { + super.clear(); + + System.err.println("Truncated file: " + file.getAbsolutePath()); + + truncations.incrementAndGet(); + + Integer checkpointedPart = null; + try { + Field field = GridDhtLocalPartition.class.getDeclaredField("partWhereTestCheckpointEnforced"); + + field.setAccessible(true); + + checkpointedPart = (Integer) field.get(null); + } + catch (Exception e) { + e.printStackTrace(); + } + + // Wait while more than one file have truncated and checkpoint on partition eviction has done. + if (truncations.get() > 1 && checkpointedPart != null) { + System.err.println("JVM is going to be crushed for test reasons..."); + + Runtime.getRuntime().halt(0); + } + } + } + + /** + * I/O Factory which creates {@link HaltOnTruncateFileIO} instances for partition files. + */ + private static class HaltOnTruncateFileIOFactory implements FileIOFactory { + /** */ + private static final long serialVersionUID = 0L; + + /** Delegate factory. */ + private final FileIOFactory delegateFactory; + + /** + * @param delegateFactory Delegate factory. + */ + HaltOnTruncateFileIOFactory(FileIOFactory delegateFactory) { + this.delegateFactory = delegateFactory; + } + + /** + * @param file File. + */ + private static boolean isPartitionFile(File file) { + return file.getName().contains("part") && file.getName().endsWith("bin"); + } + + /** {@inheritDoc} */ + @Override public FileIO create(File file) throws IOException { + FileIO delegate = delegateFactory.create(file); + + if (isPartitionFile(file)) + return new HaltOnTruncateFileIO(delegate, file); + + return delegate; + } + + /** {@inheritDoc} */ + @Override public FileIO create(File file, OpenOption... modes) throws IOException { + FileIO delegate = delegateFactory.create(file, modes); + + if (isPartitionFile(file)) + return new HaltOnTruncateFileIO(delegate, file); + + return delegate; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8cb35e11/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPartitionFilesDestroyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPartitionFilesDestroyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPartitionFilesDestroyTest.java new file mode 100644 index 0000000..b5afddf --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPartitionFilesDestroyTest.java @@ -0,0 +1,444 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.persistence; + +import java.io.File; +import java.io.IOException; +import java.nio.file.OpenOption; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.failure.FailureHandler; +import org.apache.ignite.failure.StopNodeFailureHandler; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Assert; + +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR; + +/** + * Test class to check that partition files after eviction are destroyed correctly on next checkpoint or crash recovery. + */ +public class IgnitePdsPartitionFilesDestroyTest extends GridCommonAbstractTest { + /** Cache name. */ + private static final String CACHE = "cache"; + + /** Partitions count. */ + private static final int PARTS_CNT = 32; + + /** Set if I/O exception should be thrown on partition file truncation. */ + private boolean failFileIo; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setConsistentId(igniteInstanceName); + + DataStorageConfiguration dsCfg = new DataStorageConfiguration() + .setWalMode(WALMode.LOG_ONLY) + .setCheckpointFrequency(10 * 60 * 1000) + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setMaxSize(512 * 1024 * 1024) + .setPersistenceEnabled(true) + ); + + if (failFileIo) + dsCfg.setFileIOFactory(new FailingFileIOFactory(new RandomAccessFileIOFactory())); + + cfg.setDataStorageConfiguration(dsCfg); + + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(CACHE) + .setBackups(1) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) + .setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT)); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + + failFileIo = false; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected FailureHandler getFailureHandler(String igniteInstanceName) { + return new StopNodeFailureHandler(); + } + + /** + * @param ignite Ignite. + * @param keysCnt Keys count. + */ + private void loadData(IgniteEx ignite, int keysCnt, int multiplier) { + log.info("Load data: keys=" + keysCnt); + + try (IgniteDataStreamer streamer = ignite.dataStreamer(CACHE)) { + streamer.allowOverwrite(true); + + for (int k = 0; k < keysCnt; k++) + streamer.addData(k, k * multiplier); + } + } + + /** + * @param ignite Ignite. + * @param keysCnt Keys count. + */ + private void checkData(IgniteEx ignite, int keysCnt, int multiplier) { + log.info("Check data: " + ignite.name() + ", keys=" + keysCnt); + + IgniteCache<Integer, Integer> cache = ignite.cache(CACHE); + + for (int k = 0; k < keysCnt; k++) + Assert.assertEquals("node = " + ignite.name() + ", key = " + k, (Integer) (k * multiplier), cache.get(k)); + } + + /** + * Test that partition files have been deleted correctly on next checkpoint. + * + * @throws Exception If failed. + */ + public void testPartitionFileDestroyAfterCheckpoint() throws Exception { + IgniteEx crd = (IgniteEx) startGrids(2); + + crd.cluster().active(true); + + int keysCnt = 50_000; + + loadData(crd, keysCnt, 1); + + startGridsMultiThreaded(2, 2); + + // Trigger partitions eviction. + resetBaselineTopology(); + + awaitPartitionMapExchange(true, true, null); + + checkPartitionFiles(crd, true); + + // This checkpoint should delete partition files. + forceCheckpoint(); + + checkPartitionFiles(crd, false); + + for (Ignite ignite : G.allGrids()) + checkData((IgniteEx) ignite, keysCnt, 1); + } + + /** + * Test that partition files are reused correctly. + * + * @throws Exception If failed. + */ + public void testPartitionFileDestroyAndRecreate() throws Exception { + IgniteEx crd = startGrid(0); + + IgniteEx node = startGrid(1); + + crd.cluster().active(true); + + int keysCnt = 50_000; + + loadData(crd, keysCnt, 1); + + startGridsMultiThreaded(2, 2); + + // Trigger partitions eviction. + resetBaselineTopology(); + + awaitPartitionMapExchange(true, true, null); + + checkPartitionFiles(node, true); + + // Trigger partitions re-create. + stopGrid(2); + + resetBaselineTopology(); + + awaitPartitionMapExchange(true, true, null); + + checkPartitionFiles(node, true); + + // Rewrite data. + loadData(crd, keysCnt, 2); + + // Force checkpoint on all nodes. + forceCheckpoint(); + + // Check that all unecessary partition files have been deleted. + checkPartitionFiles(node, false); + + for (Ignite ignite : G.allGrids()) + checkData((IgniteEx) ignite, keysCnt, 2); + } + + /** + * Test that partitions files have been deleted correctly during crash recovery. + * + * @throws Exception If failed. + */ + public void testPartitionFileDestroyCrashRecovery1() throws Exception { + IgniteEx crd = startGrid(0); + + failFileIo = true; + + IgniteEx problemNode = startGrid(1); + + failFileIo = false; + + crd.cluster().active(true); + + int keysCnt = 50_000; + + loadData(crd, keysCnt, 1); + + startGridsMultiThreaded(2, 2); + + // Trigger partitions eviction. + resetBaselineTopology(); + + awaitPartitionMapExchange(true, true, null); + + checkPartitionFiles(problemNode, true); + + try { + forceCheckpoint(problemNode); + + Assert.assertTrue("Checkpoint must be failed", false); + } + catch (Exception expected) { + expected.printStackTrace(); + } + + // Problem node should be stopped after failed checkpoint. + waitForTopology(3); + + problemNode = startGrid(1); + + awaitPartitionMapExchange(); + + // After recovery all evicted partition files should be deleted from disk. + checkPartitionFiles(problemNode, false); + + for (Ignite ignite : G.allGrids()) + checkData((IgniteEx) ignite, keysCnt, 1); + } + + /** + * Test that partitions files are not deleted if they were re-created on next time + * and no checkpoint has done during this time. + * + * @throws Exception If failed. + */ + public void testPartitionFileDestroyCrashRecovery2() throws Exception { + IgniteEx crd = startGrid(0); + + failFileIo = true; + + IgniteEx problemNode = startGrid(1); + + failFileIo = false; + + crd.cluster().active(true); + + int keysCnt = 50_000; + + loadData(crd, keysCnt, 1); + + // Trigger partitions eviction. + startGridsMultiThreaded(2, 2); + + resetBaselineTopology(); + + awaitPartitionMapExchange(true, true, null); + + checkPartitionFiles(problemNode, true); + + // Trigger partitions re-create. + stopGrid(2); + + resetBaselineTopology(); + + awaitPartitionMapExchange(true, true, null); + + checkPartitionFiles(problemNode, true); + + try { + forceCheckpoint(problemNode); + + Assert.assertTrue("Checkpoint must be failed", false); + } + catch (Exception expected) { + expected.printStackTrace(); + } + + // Problem node should be stopped after failed checkpoint. + waitForTopology(2); + + problemNode = startGrid(1); + + awaitPartitionMapExchange(); + + // After recovery all evicted partition files should be deleted from disk. + checkPartitionFiles(problemNode, false); + + for (Ignite ignite : G.allGrids()) + checkData((IgniteEx) ignite, keysCnt, 1); + } + + /** + * If {@code exists} is {@code true}, checks that all partition files exist + * if partition has state EVICTED. + * + * If {@code exists} is {@code false}, checks that all partition files don't exist + * if partition is absent or has state EVICTED. + * + * @param ignite Node. + * @param exists If {@code true} method will check that partition file exists, + * in other case will check that file doesn't exist. + * @throws IgniteCheckedException If failed. + */ + private void checkPartitionFiles(IgniteEx ignite, boolean exists) throws IgniteCheckedException { + int evicted = 0; + + GridDhtPartitionTopology top = ignite.cachex(CACHE).context().topology(); + + for (int p = 0; p < PARTS_CNT; p++) { + GridDhtLocalPartition part = top.localPartition(p); + + File partFile = partitionFile(ignite, CACHE, p); + + if (exists) { + if (part != null && part.state() == GridDhtPartitionState.EVICTED) + Assert.assertTrue("Partition file has deleted ahead of time: " + partFile, partFile.exists()); + + evicted++; + } + else { + if (part == null || part.state() == GridDhtPartitionState.EVICTED) + Assert.assertTrue("Partition file has not deleted: " + partFile, !partFile.exists()); + } + } + + if (exists) + Assert.assertTrue("There should be at least 1 eviction", evicted > 0); + } + + /** + * @param ignite Ignite. + * @param cacheName Cache name. + * @param partId Partition id. + */ + private static File partitionFile(Ignite ignite, String cacheName, int partId) throws IgniteCheckedException { + File dbDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false); + + String nodeName = ignite.name().replaceAll("\\.", "_"); + + return new File(dbDir, String.format("%s/cache-%s/part-%d.bin", nodeName, cacheName, partId)); + } + + /** + * + */ + static class FailingFileIO extends FileIODecorator { + /** + * @param delegate File I/O delegate + */ + public FailingFileIO(FileIO delegate) { + super(delegate); + } + + /** {@inheritDoc} */ + @Override public void clear() throws IOException { + throw new IOException("Test"); + } + } + + /** + * + */ + static class FailingFileIOFactory implements FileIOFactory { + /** Delegate factory. */ + private final FileIOFactory delegateFactory; + + /** + * @param delegateFactory Delegate factory. + */ + FailingFileIOFactory(FileIOFactory delegateFactory) { + this.delegateFactory = delegateFactory; + } + + /** + * @param file File. + */ + private static boolean isPartitionFile(File file) { + return file.getName().contains("part") && file.getName().endsWith("bin"); + } + + /** {@inheritDoc} */ + @Override public FileIO create(File file) throws IOException { + FileIO delegate = delegateFactory.create(file); + + if (isPartitionFile(file)) + return new FailingFileIO(delegate); + + return delegate; + } + + /** {@inheritDoc} */ + @Override public FileIO create(File file, OpenOption... modes) throws IOException { + FileIO delegate = delegateFactory.create(file, modes); + + if (isPartitionFile(file)) + return new FailingFileIO(delegate); + + return delegate; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8cb35e11/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPartitionFilesTruncateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPartitionFilesTruncateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPartitionFilesTruncateTest.java deleted file mode 100644 index 78c2453..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsPartitionFilesTruncateTest.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.persistence; - -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.Arrays; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteDataStreamer; -import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.DataRegionConfiguration; -import org.apache.ignite.configuration.DataStorageConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.configuration.WALMode; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; - -/** - * Checks that evicted partitions doesn't leave files in PDS. - */ -public class IgnitePdsPartitionFilesTruncateTest extends GridCommonAbstractTest { - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setConsistentId(gridName) - .setDataStorageConfiguration(new DataStorageConfiguration() - .setPageSize(1024) - .setWalMode(WALMode.LOG_ONLY) - .setDefaultDataRegionConfiguration( - new DataRegionConfiguration() - .setPersistenceEnabled(true))) - .setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME) - .setBackups(1) - .setAffinity(new RendezvousAffinityFunction(false, 32))); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - stopAllGrids(); - - cleanPersistenceDir(); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(); - - cleanPersistenceDir(); - } - - /** - * @throws Exception If failed. - */ - public void testTruncatingPartitionFilesOnEviction() throws Exception { - Ignite ignite0 = startGrids(3); - - ignite0.cluster().active(true); - - try (IgniteDataStreamer<Integer,String> streamer = ignite0.dataStreamer(DEFAULT_CACHE_NAME)) { - for (int i = 0; i < 1_000; i++) - streamer.addData(i, "Value " + i); - } - - assertEquals(1, ignite0.cacheNames().size()); - - awaitPartitionMapExchange(true, true, null); - - checkPartFiles(0); - checkPartFiles(1); - checkPartFiles(2); - - stopGrid(2); - - ignite0.cluster().setBaselineTopology(ignite0.cluster().topologyVersion()); - - awaitPartitionMapExchange(true, true, null); - - checkPartFiles(0); - checkPartFiles(1); - - startGrid(2); - - ignite0.cluster().setBaselineTopology(ignite0.cluster().topologyVersion()); - - awaitPartitionMapExchange(true, true, null); - - checkPartFiles(0); - checkPartFiles(1); - checkPartFiles(2); - } - - /** - * @param idx Node index. - */ - private void checkPartFiles(int idx) throws Exception { - Ignite ignite = grid(idx); - - int[] parts = ignite.affinity(DEFAULT_CACHE_NAME).allPartitions(ignite.cluster().localNode()); - - Path dirPath = Paths.get(U.defaultWorkDirectory(), "db", - U.maskForFileName(ignite.configuration().getIgniteInstanceName()), "cache-" + DEFAULT_CACHE_NAME); - - info("Path: " + dirPath.toString()); - - assertTrue(Files.exists(dirPath)); - - for (Path f : Files.newDirectoryStream(dirPath)) { - if (f.getFileName().toString().startsWith("part-")) - assertTrue("Node_" + idx +" should contains only partitions " + Arrays.toString(parts) - + ", but the file is redundant: " + f.getFileName(), anyMatch(parts, f)); - } - } - - /** */ - private boolean anyMatch(int[] parts, Path f) { - Pattern ptrn = Pattern.compile("part-(\\d+).bin"); - Matcher matcher = ptrn.matcher(f.getFileName().toString()); - - if (!matcher.find()) - throw new IllegalArgumentException("File is not a partition:" + f.getFileName()); - - int part = Integer.parseInt(matcher.group(1)); - - for (int p: parts) { - if (p == part) - return true; - } - - return false; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/8cb35e11/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index 5566137..cc54d23 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -1015,7 +1015,14 @@ public abstract class GridAbstractTest extends TestCase { } } - return new IgniteProcessProxy(cfg, log, locNode, resetDiscovery); + return new IgniteProcessProxy(cfg, log, locNode, resetDiscovery, additionalRemoteJvmArgs()); + } + + /** + * @return Additional JVM args for remote instances. + */ + protected List<String> additionalRemoteJvmArgs() { + return Collections.emptyList(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/8cb35e11/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java index fb59ae2..1eb7ddb 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java @@ -19,6 +19,8 @@ package org.apache.ignite.testframework.junits.multijvm; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -139,10 +141,28 @@ public class IgniteProcessProxy implements IgniteEx { * @param cfg Configuration. * @param log Logger. * @param locJvmGrid Local JVM grid. + * @throws Exception On error. + */ + public IgniteProcessProxy(IgniteConfiguration cfg, IgniteLogger log, Ignite locJvmGrid, boolean discovery) + throws Exception { + this(cfg, log, locJvmGrid, discovery, Collections.emptyList()); + } + + + /** + * @param cfg Configuration. + * @param log Logger. + * @param locJvmGrid Local JVM grid. * @param resetDiscovery Reset DiscoverySpi at the configuration. * @throws Exception On error. */ - public IgniteProcessProxy(IgniteConfiguration cfg, IgniteLogger log, Ignite locJvmGrid, boolean resetDiscovery) + public IgniteProcessProxy( + IgniteConfiguration cfg, + IgniteLogger log, + Ignite locJvmGrid, + boolean resetDiscovery, + List<String> additionalArgs + ) throws Exception { this.cfg = cfg; this.locJvmGrid = locJvmGrid; @@ -151,6 +171,7 @@ public class IgniteProcessProxy implements IgniteEx { String params = params(cfg, resetDiscovery); Collection<String> filteredJvmArgs = filteredJvmArgs(); + filteredJvmArgs.addAll(additionalArgs); final CountDownLatch rmtNodeStartedLatch = new CountDownLatch(1); http://git-wip-us.apache.org/repos/asf/ignite/blob/8cb35e11/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java index ab81d8f..ceb0f0d 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java @@ -93,6 +93,8 @@ public class IgnitePdsTestSuite extends TestSuite { /** * Fills {@code suite} with PDS test subset, which operates with real page store and does actual disk operations. * + * NOTE: These tests are also executed using I/O plugins. + * * @param suite suite to add tests into. */ public static void addRealPageStoreTests(TestSuite suite) { http://git-wip-us.apache.org/repos/asf/ignite/blob/8cb35e11/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index 76cfe4f..5a2abe8 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -26,7 +26,7 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCorrupte import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCorruptedStoreTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsExchangeDuringCheckpointTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsPageSizesTest; -import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsPartitionFilesTruncateTest; +import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsPartitionFilesDestroyTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRecoveryAfterFileCorruptionTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePersistentStoreDataStructuresTest; import org.apache.ignite.internal.processors.cache.persistence.LocalWalModeChangeDuringRebalancingSelfTest; @@ -96,18 +96,20 @@ public class IgnitePdsTestSuite2 extends TestSuite { // Integrity test. suite.addTestSuite(IgnitePdsRecoveryAfterFileCorruptionTest.class); + + suite.addTestSuite(IgnitePdsPartitionFilesDestroyTest.class); } /** * Fills {@code suite} with PDS test subset, which operates with real page store and does actual disk operations. * + * NOTE: These tests are also executed using I/O plugins. + * * @param suite suite to add tests into. */ public static void addRealPageStoreTests(TestSuite suite) { suite.addTestSuite(IgnitePdsPageSizesTest.class); - suite.addTestSuite(IgnitePdsPartitionFilesTruncateTest.class); - // Metrics test. suite.addTestSuite(IgniteDataStorageMetricsSelfTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/8cb35e11/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/InlineIndexHelper.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/InlineIndexHelper.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/InlineIndexHelper.java index 5c42d0d..3419127 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/InlineIndexHelper.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/InlineIndexHelper.java @@ -61,7 +61,6 @@ public class InlineIndexHelper { Value.SHORT, Value.INT, Value.LONG, - Value.LONG, Value.FLOAT, Value.DOUBLE, Value.DATE, http://git-wip-us.apache.org/repos/asf/ignite/blob/8cb35e11/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java index 6232362..1cb777a 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java @@ -21,6 +21,7 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsAtomicCa import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsAtomicCacheRebalancingTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsBinaryMetadataOnClusterRestartTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsBinarySortObjectFieldsTest; +import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCorruptedIndexTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsMarshallerMappingRestoreOnNodeStartTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTxCacheHistoricalRebalancingTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTxCacheRebalancingTest; @@ -78,6 +79,8 @@ public class IgnitePdsWithIndexingCoreTestSuite extends TestSuite { suite.addTestSuite(IgnitePdsThreadInterruptionTest.class); suite.addTestSuite(IgnitePdsBinarySortObjectFieldsTest.class); + suite.addTestSuite(IgnitePdsCorruptedIndexTest.class); + return suite; } }
