Repository: ignite Updated Branches: refs/heads/master ea8bfac60 -> 8826fa13e
IGNITE-8017 Disable WAL during initial rebalancing Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8826fa13 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8826fa13 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8826fa13 Branch: refs/heads/master Commit: 8826fa13edd0ab776b9fa2ab482b5a216b99b103 Parents: ea8bfac Author: Ilya Lantukh <[email protected]> Authored: Wed Apr 18 17:52:20 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Wed Apr 18 17:52:20 2018 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 7 + .../processors/cache/CacheGroupContext.java | 54 +- .../processors/cache/WalStateManager.java | 160 +++++- .../dht/GridClientPartitionTopology.java | 5 + .../dht/GridDhtPartitionTopology.java | 7 + .../dht/GridDhtPartitionTopologyImpl.java | 27 + .../dht/preloader/GridDhtPartitionDemander.java | 24 +- .../GridDhtPartitionsExchangeFuture.java | 2 + .../GridCacheDatabaseSharedManager.java | 63 +- .../IgniteCacheDatabaseSharedManager.java | 4 +- .../persistence/file/FilePageStoreManager.java | 18 +- ...lWalModeChangeDuringRebalancingSelfTest.java | 571 +++++++++++++++++++ .../ignite/testsuites/IgnitePdsTestSuite2.java | 3 + 13 files changed, 895 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8826fa13/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 437f49f..32fed05 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -878,6 +878,13 @@ public final class IgniteSystemProperties { public static final String IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD = "IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD"; /** + * When set to {@code true}, WAL will be automatically disabled during rebalancing if there is no partition in + * OWNING state. + * Default is {@code false}. + */ + public static final String IGNITE_DISABLE_WAL_DURING_REBALANCING = "IGNITE_DISABLE_WAL_DURING_REBALANCING"; + + /** * Enforces singleton. */ private IgniteSystemProperties() { http://git-wip-us.apache.org/repos/asf/ignite/blob/8826fa13/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java index 12636f3..849ecc8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java @@ -152,7 +152,10 @@ public class CacheGroupContext { private CacheGroupMetricsMXBean mxBean; /** */ - private volatile boolean walEnabled; + private volatile boolean localWalEnabled; + + /** */ + private volatile boolean globalWalEnabled; /** * @param grpId Group ID. @@ -196,9 +199,10 @@ public class CacheGroupContext { this.reuseList = reuseList; this.locStartVer = locStartVer; this.cacheType = cacheType; - this.walEnabled = walEnabled; + this.globalWalEnabled = walEnabled; + this.localWalEnabled = true; - persistWalState(walEnabled); + persistGlobalWalState(walEnabled); ioPlc = cacheType.ioPolicy(); @@ -1021,22 +1025,52 @@ public class CacheGroupContext { * WAL enabled flag. */ public boolean walEnabled() { - return walEnabled; + return localWalEnabled && globalWalEnabled; + } + + /** + * Local WAL enabled flag. + */ + public boolean localWalEnabled() { + return localWalEnabled; + } + + /** + * @Global WAL enabled flag. + */ + public boolean globalWalEnabled() { + return globalWalEnabled; } /** - * @param enabled WAL enabled flag. + * @param enabled Global WAL enabled flag. */ - public void walEnabled(boolean enabled) { - persistWalState(enabled); + public void globalWalEnabled(boolean enabled) { + persistGlobalWalState(enabled); - this.walEnabled = enabled; + this.globalWalEnabled = enabled; + } + + /** + * @param enabled Local WAL enabled flag. + */ + public void localWalEnabled(boolean enabled) { + persistLocalWalState(enabled); + + this.localWalEnabled = enabled; + } + + /** + * @param enabled Enabled flag.. + */ + private void persistGlobalWalState(boolean enabled) { + shared().database().walEnabled(grpId, enabled, false); } /** * @param enabled Enabled flag.. */ - private void persistWalState(boolean enabled) { - shared().database().walEnabled(grpId, enabled); + private void persistLocalWalState(boolean enabled) { + shared().database().walEnabled(grpId, enabled, true); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8826fa13/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java index 64a6819..4a14730 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java @@ -18,7 +18,9 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; @@ -26,10 +28,13 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.communication.GridMessageListener; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.persistence.CheckpointFuture; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.IgniteInClosureX; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.U; @@ -44,15 +49,18 @@ import org.jetbrains.annotations.Nullable; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.Map; +import java.util.Set; import java.util.UUID; import static org.apache.ignite.internal.GridTopic.TOPIC_WAL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; /** * Write-ahead log state manager. Manages WAL enable and disable. @@ -102,6 +110,9 @@ public class WalStateManager extends GridCacheSharedManagerAdapter { /** Disconnected flag. */ private boolean disconnected; + /** Holder for groups with temporary disabled WAL. */ + private volatile TemporaryDisabledWal tmpDisabledWal; + /** * Constructor. * @@ -328,6 +339,126 @@ public class WalStateManager extends GridCacheSharedManagerAdapter { } /** + * Change local WAL state before exchange is done. This method will disable WAL for groups without partitions + * in OWNING state if such feature is enabled. + * + * @param topVer Topology version. + */ + public void changeLocalStatesOnExchangeDone(AffinityTopologyVersion topVer) { + if (!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING, false)) + return; + + Set<Integer> grpsToEnableWal = new HashSet<>(); + Set<Integer> grpsToDisableWal = new HashSet<>(); + Set<Integer> grpsWithWalDisabled = new HashSet<>(); + + boolean hasNonEmptyOwning = false; + + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal() || !grp.affinityNode() || !grp.persistenceEnabled()) + continue; + + boolean hasOwning = false; + + for (GridDhtLocalPartition locPart : grp.topology().currentLocalPartitions()) { + if (locPart.state() == OWNING) { + hasOwning = true; + + if (hasNonEmptyOwning) + break; + + if (locPart.updateCounter() > 0) { + hasNonEmptyOwning = true; + + break; + } + } + } + + if (hasOwning && !grp.localWalEnabled()) { + grpsToEnableWal.add(grp.groupId()); + } + else if (!hasOwning && grp.localWalEnabled()) { + grpsToDisableWal.add(grp.groupId()); + + grpsWithWalDisabled.add(grp.groupId()); + } + else if (!grp.localWalEnabled()) + grpsWithWalDisabled.add(grp.groupId()); + } + + tmpDisabledWal = new TemporaryDisabledWal(grpsWithWalDisabled, topVer); + + if (grpsToEnableWal.isEmpty() && grpsToDisableWal.isEmpty()) + return; + + try { + if (hasNonEmptyOwning && !grpsToEnableWal.isEmpty()) + triggerCheckpoint(0).finishFuture().get(); + } + catch (IgniteCheckedException ex) { + throw new IgniteException(ex); + } + + for (Integer grpId : grpsToEnableWal) + cctx.cache().cacheGroup(grpId).localWalEnabled(true); + + for (Integer grpId : grpsToDisableWal) + cctx.cache().cacheGroup(grpId).localWalEnabled(false); + } + + /** + * Callback when group rebalancing is finished. If there are no pending groups, it should trigger checkpoint and + * change partition states. + * @param grpId Group ID. + * @param topVer Topology version. + */ + public void onGroupRebalanceFinished(int grpId, AffinityTopologyVersion topVer) { + TemporaryDisabledWal session0 = tmpDisabledWal; + + if (session0 == null || !session0.topVer.equals(topVer)) + return; + + session0.remainingGrps.remove(grpId); + + if (session0.remainingGrps.isEmpty()) { + synchronized (mux) { + if (tmpDisabledWal != session0) + return; + + for (Integer grpId0 : session0.disabledGrps) { + CacheGroupContext grp = cctx.cache().cacheGroup(grpId0); + + assert grp != null; + + if (!grp.localWalEnabled()) + grp.localWalEnabled(true); + } + + tmpDisabledWal = null; + } + + CheckpointFuture cpFut = triggerCheckpoint(0); + + assert cpFut != null; + + cpFut.finishFuture().listen(new IgniteInClosureX<IgniteInternalFuture>() { + @Override public void applyx(IgniteInternalFuture future) { + for (Integer grpId0 : session0.disabledGrps) { + CacheGroupContext grp = cctx.cache().cacheGroup(grpId0); + + assert grp != null; + + grp.topology().ownMoving(session0.topVer); + } + + cctx.exchange().refreshPartitions(); + } + }); + } + } + + /** * Handle propose message in discovery thread. * * @param msg Message. @@ -455,7 +586,7 @@ public class WalStateManager extends GridCacheSharedManagerAdapter { "no longer exist: " + msg.caches().keySet()); } else { - if (F.eq(msg.enable(), grpCtx.walEnabled())) + if (F.eq(msg.enable(), grpCtx.globalWalEnabled())) // Nothing changed -> no-op. res = new WalStateResult(msg, false); else { @@ -468,7 +599,7 @@ public class WalStateManager extends GridCacheSharedManagerAdapter { cpFut.beginFuture().get(); if (msg.enable()) { - grpCtx.walEnabled(true); + grpCtx.globalWalEnabled(true); // Enable: it is enough to release cache operations once mark is finished because // not-yet-flushed dirty pages have been logged. @@ -489,7 +620,7 @@ public class WalStateManager extends GridCacheSharedManagerAdapter { // WAL state is persisted after checkpoint if finished. Otherwise in case of crash // and restart we will think that WAL is enabled, but data might be corrupted. - grpCtx.walEnabled(false); + grpCtx.globalWalEnabled(false); } } catch (Exception e) { @@ -917,4 +1048,27 @@ public class WalStateManager extends GridCacheSharedManagerAdapter { onCompletedLocally(res); } } + + /** + * + */ + private static class TemporaryDisabledWal { + /** Groups with disabled WAL. */ + private final Set<Integer> disabledGrps; + + /** Remaining groups. */ + private final Set<Integer> remainingGrps; + + /** Topology version*/ + private final AffinityTopologyVersion topVer; + + /** */ + public TemporaryDisabledWal( + Set<Integer> disabledGrps, + AffinityTopologyVersion topVer) { + this.disabledGrps = Collections.unmodifiableSet(disabledGrps); + this.remainingGrps = new HashSet<>(disabledGrps); + this.topVer = topVer; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8826fa13/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 3e3bb0d..dcb8b96 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -1104,6 +1104,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public void ownMoving(AffinityTopologyVersion topVer) { + // No-op + } + + /** {@inheritDoc} */ @Override public void onEvicted(GridDhtLocalPartition part, boolean updateSeq) { assert updateSeq || lock.isWriteLockedByCurrentThread(); http://git-wip-us.apache.org/repos/asf/ignite/blob/8826fa13/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index d586a94d..2df2e89 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -357,6 +357,13 @@ public interface GridDhtPartitionTopology { public boolean own(GridDhtLocalPartition part); /** + * Owns all moving partitions for the given topology version. + * + * @param topVer Topology version. + */ + public void ownMoving(AffinityTopologyVersion topVer); + + /** * @param part Evicted partition. * @param updateSeq Update sequence increment flag. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/8826fa13/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 164f0bf..68104a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -2410,6 +2410,33 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public void ownMoving(AffinityTopologyVersion topVer) { + lock.writeLock().lock(); + + try { + for (GridDhtLocalPartition locPart : grp.topology().currentLocalPartitions()) { + if (locPart.state() == MOVING) { + boolean reserved = locPart.reserve(); + + try { + if (reserved && locPart.state() == MOVING && lastTopChangeVer.equals(topVer)) + grp.topology().own(locPart); + else // topology changed, rebalancing must be restarted + return; + } + finally { + if (reserved) + locPart.release(); + } + } + } + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ @Override public void onEvicted(GridDhtLocalPartition part, boolean updateSeq) { ctx.database().checkpointReadLock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/8826fa13/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 337553b..dc4bfe9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -57,6 +57,7 @@ import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.IgniteInClosureX; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; @@ -282,6 +283,14 @@ public class GridDhtPartitionDemander { final RebalanceFuture fut = new RebalanceFuture(grp, assignments, log, rebalanceId); + if (!grp.localWalEnabled()) + fut.listen(new IgniteInClosureX<IgniteInternalFuture<Boolean>>() { + @Override public void applyx(IgniteInternalFuture<Boolean> future) throws IgniteCheckedException { + if (future.get()) + ctx.walState().onGroupRebalanceFinished(grp.groupId(), assignments.topologyVersion()); + } + }); + if (!oldFut.isInitial()) oldFut.cancel(); else @@ -722,9 +731,7 @@ public class GridDhtPartitionDemander { // If message was last for this partition, // then we take ownership. if (last) { - top.own(part); - - fut.partitionDone(nodeId, p); + fut.partitionDone(nodeId, p, true); if (log.isDebugEnabled()) log.debug("Finished rebalancing partition: " + part); @@ -737,14 +744,14 @@ public class GridDhtPartitionDemander { } else { if (last) - fut.partitionDone(nodeId, p); + fut.partitionDone(nodeId, p, false); if (log.isDebugEnabled()) log.debug("Skipping rebalancing partition (state is not MOVING): " + part); } } else { - fut.partitionDone(nodeId, p); + fut.partitionDone(nodeId, p, false); if (log.isDebugEnabled()) log.debug("Skipping rebalancing partition (it does not belong on current node): " + p); @@ -762,7 +769,7 @@ public class GridDhtPartitionDemander { } for (Integer miss : supply.missed()) - fut.partitionDone(nodeId, miss); + fut.partitionDone(nodeId, miss, false); GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage( supply.rebalanceId(), @@ -1064,8 +1071,11 @@ public class GridDhtPartitionDemander { * @param nodeId Node id. * @param p Partition number. */ - private void partitionDone(UUID nodeId, int p) { + private void partitionDone(UUID nodeId, int p, boolean updateState) { synchronized (this) { + if (updateState && grp.localWalEnabled()) + grp.topology().own(grp.topology().localPartition(p)); + if (isDone()) return; http://git-wip-us.apache.org/repos/asf/ignite/blob/8826fa13/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 0d57d48..a21d98e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -1702,6 +1702,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (!grp.isLocal()) grp.topology().onExchangeDone(this, grp.affinity().readyAffinity(res), false); } + + cctx.walState().changeLocalStatesOnExchangeDone(exchId.topologyVersion()); } if (super.onDone(res, err)) { http://git-wip-us.apache.org/repos/asf/ignite/blob/8826fa13/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 8f81708..d314d50 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.FileFilter; import java.io.IOException; import java.io.RandomAccessFile; +import java.io.Serializable; import java.lang.ref.SoftReference; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -285,7 +286,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan private static final String MBEAN_GROUP = "Persistent Store"; /** WAL marker prefix for meta store. */ - private static final String WAL_KEY_PREFIX = "grp-wal-disabled-"; + private static final String WAL_KEY_PREFIX = "grp-wal-"; + + /** WAL marker prefix for meta store. */ + private static final String WAL_GLOBAL_KEY_PREFIX = WAL_KEY_PREFIX + "disabled-"; + + /** WAL marker prefix for meta store. */ + private static final String WAL_LOCAL_KEY_PREFIX = WAL_KEY_PREFIX + "local-disabled-"; /** WAL marker predicate for meta store. */ private static final IgnitePredicate<String> WAL_KEY_PREFIX_PRED = new IgnitePredicate<String>() { @@ -385,7 +392,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan private List<MetastorageLifecycleListener> metastorageLifecycleLsnrs; /** Initially disabled cache groups. */ - private Collection<Integer> initiallyWalDisabledGrps; + private Collection<Integer> initiallyGlobalWalDisabledGrps = new HashSet<>(); + + private Collection<Integer> initiallyLocalWalDisabledGrps = new HashSet<>(); /** File I/O factory for writing checkpoint markers. */ private final FileIOFactory ioFactory; @@ -596,7 +605,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan applyLastUpdates(status, true); - initiallyWalDisabledGrps = walDisabledGroups(); + fillWalDisabledGroups(); notifyMetastorageReadyForRead(); } @@ -1974,7 +1983,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan int applied = 0; WALPointer lastRead = null; - Collection<Integer> ignoreGrps = storeOnly ? Collections.emptySet() : initiallyWalDisabledGrps; + Collection<Integer> ignoreGrps = storeOnly ? Collections.emptySet() : + F.concat(false, initiallyGlobalWalDisabledGrps, initiallyLocalWalDisabledGrps); try (WALIterator it = cctx.wal().replay(status.endPtr)) { while (it.hasNextX()) { @@ -2228,7 +2238,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan long start = U.currentTimeMillis(); int applied = 0; - Collection<Integer> ignoreGrps = metastoreOnly ? Collections.emptySet() : initiallyWalDisabledGrps; + Collection<Integer> ignoreGrps = metastoreOnly ? Collections.emptySet() : + F.concat(false, initiallyGlobalWalDisabledGrps, initiallyLocalWalDisabledGrps); try (WALIterator it = cctx.wal().replay(status.startPtr)) { Map<T2<Integer, Integer>, T2<Integer, Long>> partStates = new HashMap<>(); @@ -4401,13 +4412,16 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** {@inheritDoc} */ - @Override public boolean walEnabled(int grpId) { - return !initiallyWalDisabledGrps.contains(grpId); + @Override public boolean walEnabled(int grpId, boolean local) { + if (local) + return !initiallyLocalWalDisabledGrps.contains(grpId); + else + return !initiallyGlobalWalDisabledGrps.contains(grpId); } /** {@inheritDoc} */ - @Override public void walEnabled(int grpId, boolean enabled) { - String key = walGroupIdToKey(grpId); + @Override public void walEnabled(int grpId, boolean enabled, boolean local) { + String key = walGroupIdToKey(grpId, local); checkpointReadLock(); @@ -4427,27 +4441,26 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** - * @return List of initially WAL-disabled groups. + * */ - private Collection<Integer> walDisabledGroups() { + private void fillWalDisabledGroups() { MetaStorage meta = cctx.database().metaStorage(); try { Set<String> keys = meta.readForPredicate(WAL_KEY_PREFIX_PRED).keySet(); if (keys.isEmpty()) - return Collections.emptySet(); - - HashSet<Integer> res = new HashSet<>(keys.size()); + return; for (String key : keys) { - int grpId = walKeyToGroupId(key); + T2<Integer, Boolean> t2 = walKeyToGroupIdAndLocalFlag(key); - res.add(grpId); + if (t2.get2()) + initiallyLocalWalDisabledGrps.add(t2.get1()); + else + initiallyGlobalWalDisabledGrps.add(t2.get1()); } - return res; - } catch (IgniteCheckedException e) { throw new IgniteException("Failed to read cache groups WAL state.", e); @@ -4460,8 +4473,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan * @param grpId Group ID. * @return Key. */ - private static String walGroupIdToKey(int grpId) { - return WAL_KEY_PREFIX + grpId; + private static String walGroupIdToKey(int grpId, boolean local) { + if (local) + return WAL_LOCAL_KEY_PREFIX + grpId; + else + return WAL_GLOBAL_KEY_PREFIX + grpId; } /** @@ -4470,7 +4486,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan * @param key Key. * @return Group ID. */ - private static int walKeyToGroupId(String key) { - return Integer.parseInt(key.substring(WAL_KEY_PREFIX.length())); + private static T2<Integer, Boolean> walKeyToGroupIdAndLocalFlag(String key) { + if (key.startsWith(WAL_LOCAL_KEY_PREFIX)) + return new T2<>(Integer.parseInt(key.substring(WAL_LOCAL_KEY_PREFIX.length())), true); + else + return new T2<>(Integer.parseInt(key.substring(WAL_GLOBAL_KEY_PREFIX.length())), false); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8826fa13/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java index 8746dca..bf080b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java @@ -1035,7 +1035,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap * @param grpId Group ID. * @return WAL enabled flag. */ - public boolean walEnabled(int grpId) { + public boolean walEnabled(int grpId, boolean local) { return false; } @@ -1045,7 +1045,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap * @param grpId Group id. * @param enabled flag. */ - public void walEnabled(int grpId, boolean enabled) { + public void walEnabled(int grpId, boolean enabled, boolean local) { // No-op. } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8826fa13/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java index 837f3d0..1c1b3e2 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java @@ -845,16 +845,22 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen /** {@inheritDoc} */ @Override public void beforeCacheGroupStart(CacheGroupDescriptor grpDesc) { - if (grpDesc.persistenceEnabled() && !cctx.database().walEnabled(grpDesc.groupId())) { - File dir = cacheWorkDir(grpDesc.config()); + if (grpDesc.persistenceEnabled()) { + boolean localEnabled = cctx.database().walEnabled(grpDesc.groupId(), true); + boolean globalEnabled = cctx.database().walEnabled(grpDesc.groupId(), false); - assert dir.exists(); + if (!localEnabled || !globalEnabled) { + File dir = cacheWorkDir(grpDesc.config()); - boolean res = IgniteUtils.delete(dir); + assert dir.exists(); - assert res; + boolean res = IgniteUtils.delete(dir); - grpDesc.walEnabled(false); + assert res; + + if (!globalEnabled) + grpDesc.walEnabled(false); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8826fa13/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java new file mode 100644 index 0000000..07653f2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java @@ -0,0 +1,571 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.file.OpenOption; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstractTest { + /** */ + private static boolean disableWalDuringRebalancing = true; + + /** */ + private static final AtomicReference<CountDownLatch> supplyMessageLatch = new AtomicReference<>(); + + /** */ + private static final AtomicReference<CountDownLatch> fileIOLatch = new AtomicReference<>(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setDataStorageConfiguration( + new DataStorageConfiguration() + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setPersistenceEnabled(true) + .setMaxSize(200 * 1024 * 1024) + .setInitialSize(200 * 1024 * 1024) + ) + // Test verifies checkpoint count, so it is essencial that no checkpoint is triggered by timeout + .setCheckpointFrequency(999_999_999_999L) + .setFileIOFactory(new TestFileIOFactory(new DataStorageConfiguration().getFileIOFactory())) + ); + + cfg.setCacheConfiguration( + new CacheConfiguration(DEFAULT_CACHE_NAME) + // Test checks internal state before and after rebalance, so it is configured to be triggered manually + .setRebalanceDelay(-1) + ); + + cfg.setCommunicationSpi(new TcpCommunicationSpi() { + @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + if (msg instanceof GridIoMessage && ((GridIoMessage)msg).message() instanceof GridDhtPartitionSupplyMessage) { + int grpId = ((GridDhtPartitionSupplyMessage)((GridIoMessage)msg).message()).groupId(); + + if (grpId == CU.cacheId(DEFAULT_CACHE_NAME)) { + CountDownLatch latch0 = supplyMessageLatch.get(); + + if (latch0 != null) + try { + latch0.await(); + } + catch (InterruptedException ex) { + throw new IgniteException(ex); + } + } + } + + super.sendMessage(node, msg); + } + + @Override public void sendMessage(ClusterNode node, Message msg, + IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { + if (msg instanceof GridIoMessage && ((GridIoMessage)msg).message() instanceof GridDhtPartitionSupplyMessage) { + int grpId = ((GridDhtPartitionSupplyMessage)((GridIoMessage)msg).message()).groupId(); + + if (grpId == CU.cacheId(DEFAULT_CACHE_NAME)) { + CountDownLatch latch0 = supplyMessageLatch.get(); + + if (latch0 != null) + try { + latch0.await(); + } + catch (InterruptedException ex) { + throw new IgniteException(ex); + } + } + } + + super.sendMessage(node, msg, ackC); + } + }); + + cfg.setConsistentId(igniteInstanceName); + + System.setProperty(IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING, + Boolean.toString(disableWalDuringRebalancing)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + CountDownLatch msgLatch = supplyMessageLatch.get(); + + if (msgLatch != null) { + while (msgLatch.getCount() > 0) + msgLatch.countDown(); + + supplyMessageLatch.set(null); + } + + CountDownLatch fileLatch = fileIOLatch.get(); + + if (fileLatch != null) { + while (fileLatch.getCount() > 0) + fileLatch.countDown(); + + fileIOLatch.set(null); + } + + stopAllGrids(); + + cleanPersistenceDir(); + + disableWalDuringRebalancing = true; + } + + /** + * @throws Exception If failed. + */ + public void testWalDisabledDuringRebalancing() throws Exception { + doTestSimple(); + } + + /** + * @throws Exception If failed. + */ + public void testWalNotDisabledIfParameterSetToFalse() throws Exception { + disableWalDuringRebalancing = false; + + doTestSimple(); + } + + /** + * @throws Exception If failed. + */ + private void doTestSimple() throws Exception { + Ignite ignite = startGrids(3); + + ignite.cluster().active(true); + + IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME); + + for (int k = 0; k < 10_000; k++) + cache.put(k, k); + + IgniteEx newIgnite = startGrid(3); + + final GridCacheDatabaseSharedManager.CheckpointHistory cpHistory = + ((GridCacheDatabaseSharedManager)newIgnite.context().cache().context().database()).checkpointHistory(); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return !cpHistory.checkpoints().isEmpty(); + } + }, 10_000); + + U.sleep(10); // To ensure timestamp granularity. + + long newIgniteStartedTimestamp = System.currentTimeMillis(); + + ignite.cluster().setBaselineTopology(4); + + CacheGroupContext grpCtx = newIgnite.cachex(DEFAULT_CACHE_NAME).context().group(); + + assertEquals(!disableWalDuringRebalancing, grpCtx.walEnabled()); + + U.sleep(10); // To ensure timestamp granularity. + + long rebalanceStartedTimestamp = System.currentTimeMillis(); + + for (Ignite g : G.allGrids()) + g.cache(DEFAULT_CACHE_NAME).rebalance(); + + awaitPartitionMapExchange(); + + assertTrue(grpCtx.walEnabled()); + + U.sleep(10); // To ensure timestamp granularity. + + long rebalanceFinishedTimestamp = System.currentTimeMillis(); + + for (Integer k = 0; k < 1000; k++) + assertEquals("k=" + k, k, cache.get(k)); + + int checkpointsBeforeNodeStarted = 0; + int checkpointsBeforeRebalance = 0; + int checkpointsAfterRebalance = 0; + + for (Long timestamp : cpHistory.checkpoints()) { + if (timestamp < newIgniteStartedTimestamp) + checkpointsBeforeNodeStarted++; + else if (timestamp >= newIgniteStartedTimestamp && timestamp < rebalanceStartedTimestamp) + checkpointsBeforeRebalance++; + else if (timestamp >= rebalanceStartedTimestamp && timestamp <= rebalanceFinishedTimestamp) + checkpointsAfterRebalance++; + } + + assertEquals(1, checkpointsBeforeNodeStarted); // checkpoint on start + assertEquals(0, checkpointsBeforeRebalance); + assertEquals(disableWalDuringRebalancing ? 1 : 0, checkpointsAfterRebalance); // checkpoint if WAL was re-activated + } + + /** + * @throws Exception If failed. + */ + public void testLocalAndGlobalWalStateInterdependence() throws Exception { + Ignite ignite = startGrids(3); + + ignite.cluster().active(true); + + IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME); + + for (int k = 0; k < 10_000; k++) + cache.put(k, k); + + IgniteEx newIgnite = startGrid(3); + + ignite.cluster().setBaselineTopology(ignite.cluster().nodes()); + + CacheGroupContext grpCtx = newIgnite.cachex(DEFAULT_CACHE_NAME).context().group(); + + assertFalse(grpCtx.walEnabled()); + + ignite.cluster().disableWal(DEFAULT_CACHE_NAME); + + for (Ignite g : G.allGrids()) + g.cache(DEFAULT_CACHE_NAME).rebalance(); + + awaitPartitionMapExchange(); + + assertFalse(grpCtx.walEnabled()); // WAL is globally disabled + + ignite.cluster().enableWal(DEFAULT_CACHE_NAME); + + assertTrue(grpCtx.walEnabled()); + } + + /** + * @throws Exception If failed. + */ + public void testParallelExchangeDuringRebalance() throws Exception { + doTestParallelExchange(supplyMessageLatch); + } + + /** + * @throws Exception If failed. + */ + public void testParallelExchangeDuringCheckpoint() throws Exception { + doTestParallelExchange(fileIOLatch); + } + + /** + * @throws Exception If failed. + */ + private void doTestParallelExchange(AtomicReference<CountDownLatch> latchRef) throws Exception { + Ignite ignite = startGrids(3); + + ignite.cluster().active(true); + + IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME); + + for (int k = 0; k < 10_000; k++) + cache.put(k, k); + + IgniteEx newIgnite = startGrid(3); + + CacheGroupContext grpCtx = newIgnite.cachex(DEFAULT_CACHE_NAME).context().group(); + + CountDownLatch latch = new CountDownLatch(1); + + latchRef.set(latch); + + ignite.cluster().setBaselineTopology(ignite.cluster().nodes()); + + for (Ignite g : G.allGrids()) + g.cache(DEFAULT_CACHE_NAME).rebalance(); + + assertFalse(grpCtx.walEnabled()); + + // TODO : test with client node as well + startGrid(4); // Trigger exchange + + assertFalse(grpCtx.walEnabled()); + + latch.countDown(); + + assertFalse(grpCtx.walEnabled()); + + for (Ignite g : G.allGrids()) + g.cache(DEFAULT_CACHE_NAME).rebalance(); + + awaitPartitionMapExchange(); + + assertTrue(grpCtx.walEnabled()); + } + + /** + * @throws Exception If failed. + */ + public void testDataClearedAfterRestartWithDisabledWal() throws Exception { + Ignite ignite = startGrid(0); + + ignite.cluster().active(true); + + IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME); + + for (int k = 0; k < 10_000; k++) + cache.put(k, k); + + IgniteEx newIgnite = startGrid(1); + + ignite.cluster().setBaselineTopology(2); + + CacheGroupContext grpCtx = newIgnite.cachex(DEFAULT_CACHE_NAME).context().group(); + + assertFalse(grpCtx.localWalEnabled()); + + stopGrid(1); + stopGrid(0); + + newIgnite = startGrid(1); + + newIgnite.cluster().active(true); + + newIgnite.cluster().setBaselineTopology(newIgnite.cluster().nodes()); + + cache = newIgnite.cache(DEFAULT_CACHE_NAME); + + for (int k = 0; k < 10_000; k++) + assertFalse("k=" + k +", v=" + cache.get(k), cache.containsKey(k)); + } + + /** + * @throws Exception If failed. + */ + public void testWalNotDisabledAfterShrinkingBaselineTopology() throws Exception { + Ignite ignite = startGrids(4); + + ignite.cluster().active(true); + + IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME); + + for (int k = 0; k < 10_000; k++) + cache.put(k, k); + + for (Ignite g : G.allGrids()) { + CacheGroupContext grpCtx = ((IgniteEx)g).cachex(DEFAULT_CACHE_NAME).context().group(); + + assertTrue(grpCtx.walEnabled()); + } + + stopGrid(2); + + ignite.cluster().setBaselineTopology(5); + + for (Ignite g : G.allGrids()) { + CacheGroupContext grpCtx = ((IgniteEx)g).cachex(DEFAULT_CACHE_NAME).context().group(); + + assertTrue(grpCtx.walEnabled()); + + g.cache(DEFAULT_CACHE_NAME).rebalance(); + } + + awaitPartitionMapExchange(); + + for (Ignite g : G.allGrids()) { + CacheGroupContext grpCtx = ((IgniteEx)g).cachex(DEFAULT_CACHE_NAME).context().group(); + + assertTrue(grpCtx.walEnabled()); + } + } + + /** + * + */ + private static class TestFileIOFactory implements FileIOFactory { + /** */ + private final FileIOFactory delegate; + + /** + * @param delegate Delegate. + */ + TestFileIOFactory(FileIOFactory delegate) { + this.delegate = delegate; + } + + /** {@inheritDoc} */ + @Override public FileIO create(File file) throws IOException { + return new TestFileIO(delegate.create(file)); + } + + /** {@inheritDoc} */ + @Override public FileIO create(File file, OpenOption... modes) throws IOException { + return new TestFileIO(delegate.create(file, modes)); + } + } + + /** + * + */ + private static class TestFileIO implements FileIO { + /** */ + private final FileIO delegate; + + /** + * @param delegate Delegate. + */ + TestFileIO(FileIO delegate) { + this.delegate = delegate; + } + + /** {@inheritDoc} */ + @Override public long position() throws IOException { + return delegate.position(); + } + + /** {@inheritDoc} */ + @Override public void position(long newPosition) throws IOException { + delegate.position(newPosition); + } + + /** {@inheritDoc} */ + @Override public int read(ByteBuffer destBuf) throws IOException { + return delegate.read(destBuf); + } + + /** {@inheritDoc} */ + @Override public int read(ByteBuffer destBuf, long position) throws IOException { + return delegate.read(destBuf, position); + } + + /** {@inheritDoc} */ + @Override public int read(byte[] buf, int off, int len) throws IOException { + return delegate.read(buf, off, len); + } + + /** {@inheritDoc} */ + @Override public int write(ByteBuffer srcBuf) throws IOException { + CountDownLatch latch = fileIOLatch.get(); + + if (latch != null && Thread.currentThread().getName().contains("checkpoint")) + try { + latch.await(); + } + catch (InterruptedException ex) { + throw new IgniteException(ex); + } + + return delegate.write(srcBuf); + } + + /** {@inheritDoc} */ + @Override public int write(ByteBuffer srcBuf, long position) throws IOException { + CountDownLatch latch = fileIOLatch.get(); + + if (latch != null && Thread.currentThread().getName().contains("checkpoint")) + try { + latch.await(); + } + catch (InterruptedException ex) { + throw new IgniteException(ex); + } + + return delegate.write(srcBuf, position); + } + + /** {@inheritDoc} */ + @Override public void write(byte[] buf, int off, int len) throws IOException { + CountDownLatch latch = fileIOLatch.get(); + + if (latch != null && Thread.currentThread().getName().contains("checkpoint")) + try { + latch.await(); + } + catch (InterruptedException ex) { + throw new IgniteException(ex); + } + + delegate.write(buf, off, len); + } + + /** {@inheritDoc} */ + @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException { + return delegate.map(maxWalSegmentSize); + } + + /** {@inheritDoc} */ + @Override public void force() throws IOException { + delegate.force(); + } + + /** {@inheritDoc} */ + @Override public void force(boolean withMetadata) throws IOException { + delegate.force(withMetadata); + } + + /** {@inheritDoc} */ + @Override public long size() throws IOException { + return delegate.size(); + } + + /** {@inheritDoc} */ + @Override public void clear() throws IOException { + delegate.clear(); + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + delegate.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8826fa13/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index f955b11..ede537e 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsExchange import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsPageSizesTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRecoveryAfterFileCorruptionTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePersistentStoreDataStructuresTest; +import org.apache.ignite.internal.processors.cache.persistence.LocalWalModeChangeDuringRebalancingSelfTest; import org.apache.ignite.internal.processors.cache.persistence.baseline.IgniteAllBaselineNodesOnlineFullApiSelfTest; import org.apache.ignite.internal.processors.cache.persistence.baseline.IgniteOfflineBaselineNodeFullApiSelfTest; import org.apache.ignite.internal.processors.cache.persistence.baseline.IgniteOnlineNodeOutOfBaselineFullApiSelfTest; @@ -135,5 +136,7 @@ public class IgnitePdsTestSuite2 extends TestSuite { suite.addTestSuite(IgniteCheckpointDirtyPagesForLowLoadTest.class); suite.addTestSuite(IgnitePdsCorruptedStoreTest.class); + + suite.addTestSuite(LocalWalModeChangeDuringRebalancingSelfTest.class); } }
