This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new f45c470 IGNITE-13441 Fixes for TC stabilization. - Fixes #8291.
f45c470 is described below
commit f45c470e12c6187afca16996d9c01ddb9fa79869
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Thu Oct 1 11:09:12 2020 +0300
IGNITE-13441 Fixes for TC stabilization. - Fixes #8291.
Signed-off-by: Alexey Scherbakov <[email protected]>
---
.../cache/CacheAffinitySharedManager.java | 32 ++--
.../cache/GridCacheLocalConcurrentMap.java | 2 +-
.../processors/cache/GridCacheMvccManager.java | 12 +-
.../cache/GridCachePartitionExchangeManager.java | 3 +-
.../processors/cache/GridCacheProcessor.java | 2 +
.../distributed/GridDistributedCacheEntry.java | 2 +-
.../distributed/GridDistributedTxMapping.java | 2 +-
.../cache/distributed/dht/GridDhtTxRemote.java | 18 +-
.../preloader/GridDhtPartitionsExchangeFuture.java | 6 +-
.../dht/preloader/GridDhtPreloader.java | 24 ++-
.../preloader/IgniteDhtDemandedPartitionsMap.java | 8 +
.../dht/topology/GridClientPartitionTopology.java | 5 +
.../dht/topology/GridDhtLocalPartition.java | 78 +++++++--
.../dht/topology/GridDhtPartitionTopology.java | 9 +
.../dht/topology/GridDhtPartitionTopologyImpl.java | 43 +++++
.../dht/topology/PartitionsEvictManager.java | 15 +-
.../cache/distributed/near/GridNearTxRemote.java | 5 +-
.../cache/extras/GridCacheEntryExtras.java | 2 +-
.../cache/transactions/IgniteTxEntry.java | 9 +-
.../cache/transactions/IgniteTxHandler.java | 114 ++++++------
.../cache/transactions/IgniteTxManager.java | 3 +-
.../IgniteCacheNearRestartRollbackSelfTest.java | 2 -
.../dht/topology/BlockedEvictionsTest.java | 8 +-
.../topology/DelayedOwningDuringExchangeTest.java | 191 +++++++++++++++++++++
.../EvictionWhilePartitionGroupIsReservedTest.java | 8 +-
...MovingPartitionIsEvictedDuringClearingTest.java | 5 +-
...reloadingRestartWhileClearingPartitionTest.java | 2 +-
.../RentingPartitionIsOwnedDuringEvictionTest.java | 2 +-
.../ignite/testsuites/IgniteCacheTestSuite7.java | 2 +
29 files changed, 487 insertions(+), 127 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index a44cd96..05fa72e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -344,6 +344,18 @@ public class CacheAffinitySharedManager<K, V> extends
GridCacheSharedManagerAdap
}
/**
+ * @param grpId Group id.
+ * @param partId Partition id.
+ *
+ * @return {@code True} if this node waits for the partition rebalance.
+ */
+ public boolean waitRebalance(int grpId, int partId) {
+ synchronized (mux) {
+ return waitInfo != null && waitInfo.waitGrps.getOrDefault(grpId,
Collections.emptySet()).contains(partId);
+ }
+ }
+
+ /**
* @return {@code true} if rebalance expected.
*/
public boolean rebalanceRequired() {
@@ -2395,7 +2407,7 @@ public class CacheAffinitySharedManager<K, V> extends
GridCacheSharedManagerAdap
new WaitRebalanceInfo(fut.exchangeId().topologyVersion()) :
new
WaitRebalanceInfo(fut.context().events().lastServerEventVersion());
- final Collection<ClusterNode> aliveNodes =
fut.context().events().discoveryCache().serverNodes();
+ final Collection<ClusterNode> evtNodes =
fut.context().events().discoveryCache().serverNodes();
final Map<Integer, Map<Integer, List<T>>> assignment = new
ConcurrentHashMap<>();
@@ -2427,7 +2439,7 @@ public class CacheAffinitySharedManager<K, V> extends
GridCacheSharedManagerAdap
List<ClusterNode> newNodes = newAssignment.get(p);
List<ClusterNode> curNodes = curAssignment.get(p);
- assert aliveNodes.containsAll(newNodes) : "Invalid new
assignment [grp=" + grpHolder.aff.cacheOrGroupName() +
+ assert evtNodes.containsAll(newNodes) : "Invalid new
assignment [grp=" + grpHolder.aff.cacheOrGroupName() +
", nodes=" + newNodes +
", topVer=" +
fut.context().events().discoveryCache().version() +
", evts=" + fut.context().events().events() + "]";
@@ -2437,7 +2449,7 @@ public class CacheAffinitySharedManager<K, V> extends
GridCacheSharedManagerAdap
List<ClusterNode> newNodes0 = null;
- assert newPrimary == null ||
aliveNodes.contains(newPrimary) : "Invalid new primary [" +
+ assert newPrimary == null || evtNodes.contains(newPrimary)
: "Invalid new primary [" +
"grp=" + desc.cacheOrGroupName() +
", node=" + newPrimary +
", topVer=" + topVer + ']';
@@ -2453,14 +2465,14 @@ public class CacheAffinitySharedManager<K, V> extends
GridCacheSharedManagerAdap
newNodes0 = new ArrayList<>(curNodes.size());
for (ClusterNode node : curNodes) {
- if (aliveNodes.contains(node))
+ if (evtNodes.contains(node))
newNodes0.add(node);
}
}
else if (curPrimary != null &&
!curPrimary.equals(newPrimary)) {
GridDhtPartitionState state =
top.partitionState(newPrimary.id(), p);
- if (aliveNodes.contains(curPrimary)) {
+ if (evtNodes.contains(curPrimary)) {
if (state != OWNING) {
newNodes0 =
latePrimaryAssignment(grpHolder.affinity(),
p,
@@ -2475,7 +2487,7 @@ public class CacheAffinitySharedManager<K, V> extends
GridCacheSharedManagerAdap
ClusterNode curNode = curNodes.get(i);
if (top.partitionState(curNode.id(), p) ==
OWNING &&
- aliveNodes.contains(curNode)) {
+ evtNodes.contains(curNode)) {
newNodes0 =
latePrimaryAssignment(grpHolder.affinity(),
p,
curNode,
@@ -2488,7 +2500,7 @@ public class CacheAffinitySharedManager<K, V> extends
GridCacheSharedManagerAdap
if (newNodes0 == null) {
for (ClusterNode owner : owners) {
- if (aliveNodes.contains(owner)) {
+ if (evtNodes.contains(owner)) {
newNodes0 =
latePrimaryAssignment(grpHolder.affinity(),
p,
owner,
@@ -2503,12 +2515,12 @@ public class CacheAffinitySharedManager<K, V> extends
GridCacheSharedManagerAdap
}
}
- // This will happen if no primary is changed but some
backups still need to be rebalanced.
+ // This will happen if no primary has changed but some
backups still need to be rebalanced.
if (!owners.isEmpty() && !owners.containsAll(newNodes) &&
!top.lostPartitions().contains(p))
waitRebalanceInfo.add(grpHolder.groupId(), p,
newNodes);
if (newNodes0 != null) {
- assert aliveNodes.containsAll(newNodes0) : "Invalid
late assignment [grp=" + grpHolder.aff.cacheOrGroupName() +
+ assert evtNodes.containsAll(newNodes0) : "Invalid late
assignment [grp=" + grpHolder.aff.cacheOrGroupName() +
", nodes=" + newNodes +
", topVer=" +
fut.context().events().discoveryCache().version() +
", evts=" + fut.context().events().events() + "]";
@@ -2937,7 +2949,7 @@ public class CacheAffinitySharedManager<K, V> extends
GridCacheSharedManagerAdap
/** {@inheritDoc} */
@Override public String toString() {
- return "WaitRebalanceInfo [topVer=" + topVer + ", grps=" +
waitGrps + ']';
+ return "WaitRebalanceInfo [topVer=" + topVer + ", grps=" +
waitGrps.keySet() + ']';
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java
index 8d2a1e3..b4c93cf 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java
@@ -41,7 +41,7 @@ public class GridCacheLocalConcurrentMap extends
GridCacheConcurrentMapImpl {
this.cacheId = cctx.cacheId();
this.entryMap = new CacheMapHolder(cctx,
- new ConcurrentHashMap<KeyCacheObject, GridCacheMapEntry>(initCap,
0.75f, Runtime.getRuntime().availableProcessors() * 2));
+ new ConcurrentHashMap<>(initCap, 0.75f,
Runtime.getRuntime().availableProcessors() * 2));
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 23ea34f..33e15a6 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -1335,9 +1335,6 @@ public class GridCacheMvccManager extends
GridCacheSharedManagerAdapter {
recheck(entry);
}
- if (log.isDebugEnabled())
- log.debug("After rechecking finished future: " + this);
-
if (pendingLocks.isEmpty()) {
if (exchLog.isDebugEnabled())
exchLog.debug("Finish lock future is done: " + this);
@@ -1361,13 +1358,8 @@ public class GridCacheMvccManager extends
GridCacheSharedManagerAdapter {
if (cands != null) {
synchronized (cands) {
- for (Iterator<GridCacheMvccCandidate> it =
cands.iterator(); it.hasNext(); ) {
- GridCacheMvccCandidate cand = it.next();
-
- // Check exclude ID again, as key could have been
reassigned.
- if (cand.removed())
- it.remove();
- }
+ // Check exclude ID again, as key could have been
reassigned.
+ cands.removeIf(GridCacheMvccCandidate::removed);
if (cands.isEmpty())
pendingLocks.remove(entry.txKey());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index d4b33af..fde6895 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -291,7 +292,7 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
private ExchangeLatchManager latchMgr;
/** List of exchange aware components. */
- private final List<PartitionsExchangeAware> exchangeAwareComps = new
ArrayList<>();
+ private final List<PartitionsExchangeAware> exchangeAwareComps = new
CopyOnWriteArrayList<>();
/** Histogram of PME durations. */
private volatile HistogramMetricImpl durationHistogram;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index d792e64..6465a2a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -550,6 +550,8 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
grp.removeIOStatistic();
}
+ sharedCtx.evict().cleanupRemovedGroup(grp.groupId());
+
cachesInfo.cleanupRemovedGroup(grp.groupId());
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
index 46aca81..c371f26 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
@@ -63,7 +63,7 @@ public class GridDistributedCacheEntry extends
GridCacheMapEntry {
private void refreshRemotes() {
GridCacheMvcc mvcc = mvccExtras();
- rmts = mvcc == null ? Collections.<GridCacheMvccCandidate>emptyList()
: mvcc.remoteCandidates();
+ rmts = mvcc == null ? Collections.emptyList() :
mvcc.remoteCandidates();
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
index 0eba942..44fc036 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
@@ -46,7 +46,7 @@ public class GridDistributedTxMapping {
/** Mapped node. */
@GridToStringExclude
- private ClusterNode primary;
+ private final ClusterNode primary;
/** Mapped backup nodes. */
private volatile Set<UUID> backups;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index bb6a1db..2c12695 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -31,7 +31,6 @@ import
org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import
org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxRemoteAdapter;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import
org.apache.ignite.internal.processors.cache.transactions.IgniteTxRemoteSingleStateImpl;
@@ -307,21 +306,16 @@ public class GridDhtTxRemote extends
GridDistributedTxRemoteAdapter {
GridCacheContext cacheCtx = entry.context();
- try {
- GridDhtCacheEntry cached = cacheCtx.dht().entryExx(entry.key(),
topologyVersion());
+ GridDhtCacheEntry cached = cacheCtx.dht().entryExx(entry.key(),
topologyVersion());
- checkInternal(entry.txKey());
+ checkInternal(entry.txKey());
- // Initialize cache entry.
- entry.cached(cached);
+ // Initialize cache entry.
+ entry.cached(cached);
- txState.addWriteEntry(entry.txKey(), entry);
+ txState.addWriteEntry(entry.txKey(), entry);
- addExplicit(entry);
- }
- catch (GridDhtInvalidPartitionException e) {
- addInvalidPartition(cacheCtx.cacheId(), e.partition());
- }
+ addExplicit(entry);
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 24ba686..e9f4d05 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2002,7 +2002,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
cctx.exchange().exchangerBlockingSectionBegin();
try {
- locksFut.get(waitTimeout, TimeUnit.MILLISECONDS);
+ locksFut.get(50, TimeUnit.MILLISECONDS);
break;
}
@@ -2030,6 +2030,10 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
if (getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT,
false))
U.dumpThreads(log);
}
+
+ // Sometimes FinishLockFuture is not rechecked causing
frozen PME.
+ // Will recheck every 50 milliseconds.
+ cctx.mvcc().recheckPendingLocks();
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index dee2747..925f7a8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -45,6 +45,7 @@ import
org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;
@@ -282,11 +283,26 @@ public class GridDhtPreloader extends
GridCachePreloaderAdapter {
}
if (!assignments.isEmpty()) {
-
ctx.database().lastCheckpointInapplicableForWalRebalance(grp.groupId());
+ if (exchFut != null && exchFut.rebalanced()) {
+ GridDhtPartitionDemandMessage first =
assignments.values().iterator().next();
+
+ GridDhtLocalPartition locPart =
grp.topology().localPartition(first.partitions().all().iterator().next());
+
+ SB buf = new SB(1024);
+
+ buf.a("Unexpected rebalance on rebalanced cluster:
assignments=");
+ buf.a(assignments);
+ buf.a(", locPart=");
- assert exchFut == null || !exchFut.rebalanced() :
- "Unexpected rebalance on rebalanced cluster " +
- "[top=" + topVer + ", grp=" + grp.groupId() + ",
assignments=" + assignments + "]";
+ if (locPart != null)
+ locPart.dumpDebugInfo(buf);
+ else
+ buf.a("NA");
+
+ throw new AssertionError(buf.toString());
+ }
+
+
ctx.database().lastCheckpointInapplicableForWalRebalance(grp.groupId());
}
return assignments;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java
index 5417d40..348024c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java
@@ -19,10 +19,12 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import java.io.Serializable;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
@@ -174,6 +176,12 @@ public class IgniteDhtDemandedPartitionsMap implements
Serializable {
return historical;
}
+ /** */
+ public Collection<Integer> all() {
+ return F.concat(false, fullSet(), historicalSet());
+ }
+
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IgniteDhtDemandedPartitionsMap.class, this);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
index 8e5498d..ad0a52b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
@@ -1473,4 +1473,9 @@ public class GridClientPartitionTopology implements
GridDhtPartitionTopology {
}
}
}
+
+ /** {@inheritDoc} */
+ @Override public boolean rent(int p) {
+ return false;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
index f19526e..34046f0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
@@ -17,8 +17,11 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.NavigableSet;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
@@ -26,6 +29,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -62,8 +66,10 @@ import
org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.util.deque.FastSizeDeque;
@@ -161,7 +167,7 @@ public class GridDhtLocalPartition extends
GridCacheConcurrentMapImpl implements
/** Set if failed to move partition to RENTING state due to reservations,
to be checked when
* reservation is released. */
- private volatile long delayedRentingTopVer;
+ private volatile boolean delayedRenting;
/** */
private final AtomicReference<GridFutureAdapter<?>> finishFutRef = new
AtomicReference<>();
@@ -194,7 +200,14 @@ public class GridDhtLocalPartition extends
GridCacheConcurrentMapImpl implements
cacheMaps = new IntRWHashMap<>();
}
else {
- singleCacheEntryMap = new CacheMapHolder(grp.singleCacheContext(),
createEntriesMap());
+ GridCacheContext cctx = grp.singleCacheContext();
+
+ if (cctx.isNear())
+ cctx = cctx.near().dht().context();
+
+ singleCacheEntryMap = ctx.kernalContext().resource().resolve(
+ new CacheMapHolder(cctx, createEntriesMap()));
+
cacheMaps = null;
}
@@ -287,7 +300,11 @@ public class GridDhtLocalPartition extends
GridCacheConcurrentMapImpl implements
if (hld != null)
return hld;
- CacheMapHolder old = cacheMaps.putIfAbsent(cctx.cacheIdBoxed(), hld =
new CacheMapHolder(cctx, createEntriesMap()));
+ if (cctx.isNear())
+ cctx = cctx.near().dht().context();
+
+ CacheMapHolder old = cacheMaps.putIfAbsent(cctx.cacheIdBoxed(), hld =
ctx.kernalContext().resource().resolve(
+ new CacheMapHolder(cctx, createEntriesMap())));
if (old != null)
hld = old;
@@ -681,16 +698,12 @@ public class GridDhtLocalPartition extends
GridCacheConcurrentMapImpl implements
return rent;
}
- // Store current topology version to check on partition release.
- delayedRentingTopVer =
ctx.exchange().readyAffinityVersion().topologyVersion();
-
if (tryInvalidateGroupReservations() && getReservations(state0) == 0
&& casState(state0, RENTING)) {
- delayedRentingTopVer = 0;
-
- // Evict asynchronously, as the 'rent' method may be called
- // from within write locks on local partition.
+ // Evict asynchronously, as the 'rent' method may be called from
within write locks on local partition.
clearAsync();
}
+ else
+ delayedRenting = true;
return rent;
}
@@ -699,9 +712,8 @@ public class GridDhtLocalPartition extends
GridCacheConcurrentMapImpl implements
* Continue clearing if it was delayed before due to reservation and
topology version not changed.
*/
public void tryContinueClearing() {
- if (delayedRentingTopVer != 0 &&
- delayedRentingTopVer ==
ctx.exchange().readyAffinityVersion().topologyVersion())
- rent();
+ if (delayedRenting)
+ group().topology().rent(id);
}
/**
@@ -1385,4 +1397,44 @@ public class GridDhtLocalPartition extends
GridCacheConcurrentMapImpl implements
return S.toString(RemovedEntryHolder.class, this);
}
}
+
+ /**
+ * Collects detailed info about the partition.
+ *
+ * @param buf Buffer.
+ */
+ public void dumpDebugInfo(SB buf) {
+ GridDhtPartitionTopology top = grp.topology();
+ AffinityTopologyVersion topVer = top.readyTopologyVersion();
+
+ if (!topVer.initialized()) {
+ buf.a(toString());
+
+ return;
+ }
+
+ final int limit = 3;
+
+ buf.a("[topVer=").a(topVer);
+ buf.a(", lastChangeTopVer=").a(top.lastTopologyChangeVersion());
+ buf.a(",
waitRebalance=").a(ctx.kernalContext().cache().context().affinity().waitRebalance(grp.groupId(),
id));
+ buf.a(", nodes=").a(F.nodeIds(top.nodes(id,
topVer)).stream().limit(limit).collect(Collectors.toList()));
+ buf.a(", locPart=").a(toString());
+
+ NavigableSet<AffinityTopologyVersion> versions =
grp.affinity().cachedVersions();
+
+ int i = 5;
+
+ Iterator<AffinityTopologyVersion> iter = versions.descendingIterator();
+
+ while (--i >= 0 && iter.hasNext()) {
+ AffinityTopologyVersion topVer0 = iter.next();
+ buf.a(", ver").a(i).a('=').a(topVer0);
+
+ Collection<UUID> nodeIds =
F.nodeIds(grp.affinity().cachedAffinity(topVer0).get(id));
+ buf.a(",
affOwners").a(i).a('=').a(nodeIds.stream().limit(limit).collect(Collectors.toList()));
+ }
+
+ buf.a(']');
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
index 2ad9960..6071a7c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
@@ -443,4 +443,13 @@ public interface GridDhtPartitionTopology {
* @param updateRebalanceVer {@code True} if need check rebalance state.
*/
public void onExchangeDone(GridDhtPartitionsExchangeFuture fut,
AffinityAssignment assignment, boolean updateRebalanceVer);
+
+ /**
+ * Rents a partition and updates a partition map if the partition was
switched to RENTING.
+ *
+ * @param p Partition ID.
+ *
+ * @return {@code True} if the partition was switched to RENTING.
+ */
+ public boolean rent(int p);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index 7f471da..fe8eca4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -2458,6 +2458,7 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
U.warn(log, "Partitions have been scheduled for
rebalancing due to outdated update counter "
+ "[grp=" + grp.cacheOrGroupName()
+ + ", readyTopVer=" + readyTopVer
+ ", topVer=" + exchFut.initialVersion()
+ ", nodeId=" + nodeId
+ ", partsFull=" + S.compact(rebalancedParts)
@@ -3204,6 +3205,48 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
return false;
}
+ /** {@inheritDoc} */
+ @Override public boolean rent(int p) {
+ ctx.database().checkpointReadLock();
+
+ try {
+ lock.writeLock().lock();
+
+ try {
+ // Do not rent if PME in progress, will be rented later if
applicable.
+ if (lastTopChangeVer.after(readyTopVer))
+ return false;
+
+ GridDhtLocalPartition locPart = localPartition(p);
+
+ GridDhtPartitionState state0 = locPart.state();
+
+ if (locPart == null || state0 == RENTING || state0 == EVICTED
|| partitionLocalNode(p, readyTopVer))
+ return false;
+
+ locPart.rent();
+
+ GridDhtPartitionState state = locPart.state();
+
+ if (state == RENTING && state != state0) {
+ long updateSeq = this.updateSeq.incrementAndGet();
+
+ updateLocal(p, state0, updateSeq, readyTopVer);
+
+ ctx.exchange().scheduleResendPartitions();
+ }
+
+ return true;
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+ }
+ finally {
+ ctx.database().checkpointReadUnlock();
+ }
+ }
+
/**
* Checks consistency after all operations.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
index e348854..b545d4b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
@@ -95,7 +95,7 @@ public class PartitionsEvictManager extends
GridCacheSharedManagerAdapter {
* @param grp Group.
*/
public void onCacheGroupStarted(CacheGroupContext grp) {
- evictionGroupsMap.remove(grp.groupId());
+ // No-op.
}
/**
@@ -106,6 +106,7 @@ public class PartitionsEvictManager extends
GridCacheSharedManagerAdapter {
* @param grp Group context.
*/
public void onCacheGroupStopped(CacheGroupContext grp) {
+ // Must keep context in the map to avoid race with subsequent clearing
request after the call to this method.
GroupEvictionContext grpEvictionCtx =
evictionGroupsMap.computeIfAbsent(grp.groupId(), p -> new
GroupEvictionContext(grp));
@@ -134,6 +135,9 @@ public class PartitionsEvictManager extends
GridCacheSharedManagerAdapter {
try {
int grpId = grp.groupId();
+ if (cctx.cache().cacheGroup(grpId) == null)
+ return new GridFinishedFuture<>(new
CacheStoppedException(grp.cacheOrGroupName()));
+
GroupEvictionContext grpEvictionCtx =
evictionGroupsMap.computeIfAbsent(
grpId, k -> new GroupEvictionContext(grp));
@@ -250,6 +254,15 @@ public class PartitionsEvictManager extends
GridCacheSharedManagerAdapter {
}
/**
+ * Cleans up group eviction context when it's safe.
+ *
+ * @param grpId Group id.
+ */
+ public void cleanupRemovedGroup(int grpId) {
+ evictionGroupsMap.remove(grpId);
+ }
+
+ /**
*
*/
private class GroupEvictionContext implements EvictionContext {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
index c8e442c..ec9f511 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
@@ -321,8 +321,7 @@ public class GridNearTxRemote extends
GridDistributedTxRemoteAdapter {
GridCacheContext cacheCtx = entry.context();
- if (!cacheCtx.isNear())
- cacheCtx = cacheCtx.dht().near().context();
+ assert cacheCtx.isNear() : entry;
GridNearCacheEntry cached = cacheCtx.near().peekExx(entry.key());
@@ -333,7 +332,7 @@ public class GridNearTxRemote extends
GridDistributedTxRemoteAdapter {
}
else {
try {
- cached.unswap();
+ // Unswap is no-op for near cache.
CacheObject val = cached.peek();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/extras/GridCacheEntryExtras.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/extras/GridCacheEntryExtras.java
index 2317b51..1438b15 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/extras/GridCacheEntryExtras.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/extras/GridCacheEntryExtras.java
@@ -31,7 +31,7 @@ public interface GridCacheEntryExtras {
@Nullable public GridCacheMvcc mvcc();
/**
- * @param mvcc NVCC.
+ * @param mvcc MVCC.
* @return Updated extras.
*/
public GridCacheEntryExtras mvcc(GridCacheMvcc mvcc);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 7dd8416..02ee036 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -33,6 +33,7 @@ import
org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
@@ -945,7 +946,7 @@ public class IgniteTxEntry implements GridPeerDeployAware,
Message {
if (cacheCtx == null)
throw new CacheInvalidStateException(
"Failed to perform cache operation (cache is stopped),
cacheId=" + cacheId);
-
+
if (cacheCtx.isNear() && !near)
cacheCtx = cacheCtx.near().dht().context();
else if (!cacheCtx.isNear() && near)
@@ -954,6 +955,12 @@ public class IgniteTxEntry implements GridPeerDeployAware,
Message {
this.ctx = cacheCtx;
}
+ CacheObjectValueContext coctx = this.ctx.cacheObjectContext();
+
+ if (coctx == null)
+ throw new CacheInvalidStateException(
+ "Failed to perform cache operation (cache is stopped),
cacheId=" + cacheId);
+
// Unmarshal transform closure anyway if it exists.
if (transformClosBytes != null && entryProcessorsCol == null)
entryProcessorsCol = U.unmarshal(ctx, transformClosBytes,
U.resolveClassLoader(clsLdr, ctx.gridConfig()));
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index a3a3cd7..7bb93fb 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -19,7 +19,9 @@ package
org.apache.ignite.internal.processors.cache.transactions;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
@@ -1778,53 +1780,60 @@ public class IgniteTxHandler {
txCounters.updateCounters(req.updateCounters());
}
- if (!tx.isSystemInvalidate()) {
- int idx = 0;
+ Set<GridDhtLocalPartition> reservedParts = new HashSet<>();
- for (IgniteTxEntry entry : req.writes()) {
- GridCacheContext cacheCtx = entry.context();
+ try {
+ if (!tx.isSystemInvalidate()) {
+ int idx = 0;
- int part = cacheCtx.affinity().partition(entry.key());
+ for (IgniteTxEntry entry : req.writes()) {
+ GridCacheContext cacheCtx = entry.context();
- GridDhtLocalPartition locPart =
cacheCtx.topology().localPartition(part,
- req.topologyVersion(),
- false);
+ int part = cacheCtx.affinity().partition(entry.key());
- if (locPart != null && locPart.reserve()) {
try {
- tx.addWrite(entry, ctx.deploy().globalLoader());
-
- // Entry will be invalidated if a partition was
moved to RENTING.
- if (locPart.state() == RENTING)
- continue;
+ GridDhtLocalPartition locPart =
cacheCtx.topology().localPartition(part,
+ req.topologyVersion(),
+ false);
- if (txCounters != null) {
- Long cntr =
txCounters.generateNextCounter(entry.cacheId(), part);
+ // Avoid enlisting to invalid partition.
+ boolean reserved = locPart != null &&
reservedParts.contains(locPart);
- if (cntr != null) // Counter is null if entry
is no-op.
- entry.updateCounter(cntr);
+ if (!reserved) {
+ if ((reserved = locPart != null &&
locPart.reserve()))
+ reservedParts.add(locPart);
}
- if (isNearEnabled(cacheCtx) &&
req.invalidateNearEntry(idx))
- invalidateNearEntry(cacheCtx, entry.key(),
req.version());
+ if (reserved) {
+ tx.addWrite(entry,
ctx.deploy().globalLoader());
+
+ if (txCounters != null) {
+ Long cntr =
txCounters.generateNextCounter(entry.cacheId(), part);
- if (req.needPreloadKey(idx)) {
- GridCacheEntryEx cached = entry.cached();
+ if (cntr != null) // Counter is null if
entry is no-op.
+ entry.updateCounter(cntr);
+ }
- if (cached == null)
- cached =
cacheCtx.cache().entryEx(entry.key(), req.topologyVersion());
+ if (isNearEnabled(cacheCtx) &&
req.invalidateNearEntry(idx))
+ invalidateNearEntry(cacheCtx, entry.key(),
req.version());
- GridCacheEntryInfo info = cached.info();
+ if (req.needPreloadKey(idx)) {
+ GridCacheEntryEx cached = entry.cached();
- if (info != null && !info.isNew() &&
!info.isDeleted())
- res.addPreloadEntry(info);
- }
+ if (cached == null)
+ cached =
cacheCtx.cache().entryEx(entry.key(), req.topologyVersion());
+
+ GridCacheEntryInfo info = cached.info();
- if (cacheCtx.readThroughConfigured() &&
- !entry.skipStore() &&
- entry.op() == TRANSFORM &&
- entry.oldValueOnPrimary() &&
- !entry.hasValue()) {
+ if (info != null && !info.isNew() &&
!info.isDeleted())
+ res.addPreloadEntry(info);
+ }
+
+ if (cacheCtx.readThroughConfigured() &&
+ !entry.skipStore() &&
+ entry.op() == TRANSFORM &&
+ entry.oldValueOnPrimary() &&
+ !entry.hasValue()) {
while (true) {
try {
GridCacheEntryEx cached =
entry.cached();
@@ -1853,36 +1862,35 @@ public class IgniteTxHandler {
if (val != null)
entry.readValue(val);
- break;
- }
- catch (GridCacheEntryRemovedException
ignored) {
- if (log.isDebugEnabled())
- log.debug("Got entry removed
exception, will retry: " + entry.txKey());
+ break;
+ }
+ catch (GridCacheEntryRemovedException
ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Got entry removed
exception, will retry: " + entry.txKey());
-
entry.cached(cacheCtx.cache().entryEx(entry.key(), req.topologyVersion()));
+
entry.cached(cacheCtx.cache().entryEx(entry.key(), req.topologyVersion()));
+ }
}
}
}
+ else
+ tx.addInvalidPartition(cacheCtx.cacheId(),
part);
}
catch (GridDhtInvalidPartitionException e) {
- tx.addInvalidPartition(cacheCtx.cacheId(),
e.partition());
-
- tx.clearEntry(entry.txKey());
- }
- finally {
- locPart.release();
+ tx.addInvalidPartition(cacheCtx.cacheId(), part);
}
- }
- else
- tx.addInvalidPartition(cacheCtx.cacheId(), part);
- idx++;
+ idx++;
+ }
}
- }
- // Prepare prior to reordering, so the pending locks added
- // in prepare phase will get properly ordered as well.
- tx.prepareRemoteTx();
+ // Prepare prior to reordering, so the pending locks added
+ // in prepare phase will get properly ordered as well.
+ tx.prepareRemoteTx();
+ }
+ finally {
+ reservedParts.forEach(GridDhtLocalPartition::release);
+ }
if (req.last()) {
assert !F.isEmpty(req.transactionNodes()) :
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 9e8f0f3..cff7616 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -3269,7 +3269,8 @@ public class IgniteTxManager extends
GridCacheSharedManagerAdapter {
? new GridCompoundFuture<>() : null;
for (final IgniteInternalTx tx : activeTransactions()) {
- if ((tx.near() && !tx.local()) || (tx.storeWriteThrough()
&& tx.masterNodeIds().contains(evtNodeId))) {
+ if ((tx.near() && !tx.local() &&
tx.originatingNodeId().equals(evtNodeId))
+ || (tx.storeWriteThrough() &&
tx.masterNodeIds().contains(evtNodeId))) {
// Invalidate transactions.
salvageTx(tx, RECOVERY_FINISH);
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java
index 22b2ff7..46375f3 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java
@@ -46,7 +46,6 @@ import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionRollbackException;
-import org.junit.Ignore;
import org.junit.Test;
import static
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
@@ -104,7 +103,6 @@ public class IgniteCacheNearRestartRollbackSelfTest extends
GridCommonAbstractTe
* @throws Exception If failed.
*/
@Test
- @Ignore("IGNITE-13441")
public void testRestarts() throws Exception {
startGrids(3);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/BlockedEvictionsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/BlockedEvictionsTest.java
index 1ef7356..ab728b1 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/BlockedEvictionsTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/BlockedEvictionsTest.java
@@ -72,7 +72,7 @@ public class BlockedEvictionsTest extends
GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
- cfg.setRebalanceThreadPoolSize(ThreadLocalRandom.current().nextInt(4)
+ 1);
+ cfg.setRebalanceThreadPoolSize(ThreadLocalRandom.current().nextInt(3)
+ 2);
cfg.setSystemThreadPoolSize(sysPoolSize);
cfg.setConsistentId(igniteInstanceName);
@@ -268,12 +268,12 @@ public class BlockedEvictionsTest extends
GridCommonAbstractTest {
PartitionsEvictManager mgr =
grid(0).context().cache().context().evict();
- // Group eviction context should remain in map. TODO leak ?
+ // Group eviction context should remain in map.
Map evictionGroupsMap = U.field(mgr, "evictionGroupsMap");
- assertEquals(1, evictionGroupsMap.size());
+ assertEquals("Group context must be cleaned up", 0,
evictionGroupsMap.size());
- IgniteCache<Object, Object> cache =
grid(0).getOrCreateCache(cacheConfiguration());
+ grid(0).getOrCreateCache(cacheConfiguration());
assertEquals(0, evictionGroupsMap.size());
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/DelayedOwningDuringExchangeTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/DelayedOwningDuringExchangeTest.java
new file mode 100644
index 0000000..ad560d5
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/DelayedOwningDuringExchangeTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
+
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Tests a scenario when a temporary owned partition is released during PME.
+ * The eviction should not start because this partition can be assigned as
primary.
+ */
+@WithSystemProperty(key = "IGNITE_PRELOAD_RESEND_TIMEOUT", value = "0")
+public class DelayedOwningDuringExchangeTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+ cfg.setConsistentId(igniteInstanceName);
+
+ cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME).
+ setCacheMode(CacheMode.PARTITIONED).
+ setBackups(0).
+ setAffinity(new RendezvousAffinityFunction(false, 64)));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDelayedOwning_1() throws Exception {
+ testDelayedRenting(0, 0);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDelayedOwning_2() throws Exception {
+ testDelayedRenting(0, 1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDelayedOwning_3() throws Exception {
+ testDelayedRenting(1, 0);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDelayedOwning_4() throws Exception {
+ testDelayedRenting(1, 1);
+ }
+
+ /**
+ * @param idx Index.
+ * @param mode Mode.
+ */
+ private void testDelayedRenting(int idx, int mode) throws Exception {
+ final int nodes = 2;
+
+ IgniteEx crd = startGrids(nodes);
+
+ awaitPartitionMapExchange();
+
+ IgniteEx testGrid = grid(idx);
+
+ CountDownLatch l1 = new CountDownLatch(1);
+ CountDownLatch l2 = new CountDownLatch(1);
+
+
testGrid.context().cache().context().exchange().registerExchangeAwareComponent(new
PartitionsExchangeAware() {
+ @Override public void
onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+ wait(fut, 0);
+ }
+
+ @Override public void
onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+ wait(fut, 1);
+ }
+
+ private void wait(GridDhtPartitionsExchangeFuture fut, int mode0) {
+ if (fut.initialVersion().equals(new
AffinityTopologyVersion(nodes + 2, 0)) && mode == mode0) {
+ l1.countDown();
+
+ try {
+ assertTrue(U.await(l2, 30_000, TimeUnit.MILLISECONDS));
+ } catch (IgniteInterruptedCheckedException e) {
+ fail(X.getFullStackTrace(e));
+ }
+ }
+ }
+ });
+
+ int p0 = evictingPartitionsAfterJoin(testGrid,
testGrid.cache(DEFAULT_CACHE_NAME), 1).get(0);
+
+ testGrid.cache(DEFAULT_CACHE_NAME).put(p0, 0);
+
+ GridDhtPartitionTopology top0 =
testGrid.cachex(DEFAULT_CACHE_NAME).context().topology();
+ GridDhtLocalPartition evictPart = top0.localPartition(p0);
+ assertTrue(evictPart.reserve());
+
+ IgniteEx joined = startGrid(nodes);
+
+ GridDhtPartitionTopology top1 =
joined.cachex(DEFAULT_CACHE_NAME).context().topology();
+
+ assertTrue(GridTestUtils.waitForCondition(
+ () -> top0.nodes(p0, new AffinityTopologyVersion(nodes + 1,
1)).size() == 2, 5_000));
+
+ assertTrue(GridTestUtils.waitForCondition(
+ () -> top1.nodes(p0, new AffinityTopologyVersion(nodes + 1,
1)).size() == 2, 5_000));
+
+ Collection<ClusterNode> affOwners =
testGrid.affinity(DEFAULT_CACHE_NAME).mapPartitionToPrimaryAndBackups(p0);
+ assertEquals(1, affOwners.size());
+
+ IgniteInternalFuture fut = GridTestUtils.runAsync(new Runnable() {
+ @Override public void run() {
+ stopGrid(nodes);
+ }
+ });
+
+ assertTrue(U.await(l1, 30_000, TimeUnit.MILLISECONDS));
+
+ evictPart.release();
+
+ doSleep(1000);
+
+ l2.countDown();
+
+ awaitPartitionMapExchange(true, true, null);
+
+ fut.get();
+
+ assertEquals(0, testGrid.cache(DEFAULT_CACHE_NAME).get(p0));
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/EvictionWhilePartitionGroupIsReservedTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/EvictionWhilePartitionGroupIsReservedTest.java
index a66f961..75c385c 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/EvictionWhilePartitionGroupIsReservedTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/EvictionWhilePartitionGroupIsReservedTest.java
@@ -149,9 +149,7 @@ public class EvictionWhilePartitionGroupIsReservedTest
extends GridCommonAbstrac
@Override public boolean apply() {
GridDhtLocalPartition locPart = top.localPartition(p);
- long delayedRentingTopVer = U.field(locPart,
"delayedRentingTopVer");
-
- return delayedRentingTopVer > 0;
+ return U.field(locPart, "delayedRenting");
}
}, 5_000));
}
@@ -159,6 +157,10 @@ public class EvictionWhilePartitionGroupIsReservedTest
extends GridCommonAbstrac
grpR.release();
+ // Necessary to guaranatee a call to rent().
+ assertTrue(GridTestUtils.waitForCondition(() ->
+
top.readyTopologyVersion().equals(top.lastTopologyChangeVersion()), 5_000));
+
assertEquals(clientAfter, grpR.reserve());
awaitPartitionMapExchange(true, true, null);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/MovingPartitionIsEvictedDuringClearingTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/MovingPartitionIsEvictedDuringClearingTest.java
index 995e578..a660533 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/MovingPartitionIsEvictedDuringClearingTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/MovingPartitionIsEvictedDuringClearingTest.java
@@ -43,7 +43,7 @@ import static
org.apache.ignite.internal.processors.cache.distributed.dht.topolo
* Tests a scenario when a clearing partition is attempted to evict after a
call to
* {@link GridDhtPartitionTopology#tryFinishEviction(GridDhtLocalPartition)}.
*
- * Such a scenario can leave a partition in RENTING state until the next
exchange, but it's look acceptable.
+ * Such a scenario can leave a partition in RENTING state until the next
exchange. It's actually acceptable behavior.
*/
@WithSystemProperty(key = "IGNITE_PRELOAD_RESEND_TIMEOUT", value = "0")
public class MovingPartitionIsEvictedDuringClearingTest extends
GridCommonAbstractTest {
@@ -51,7 +51,8 @@ public class MovingPartitionIsEvictedDuringClearingTest
extends GridCommonAbstra
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
- cfg.setRebalanceThreadPoolSize(ThreadLocalRandom.current().nextInt(4)
+ 1);
+ // Need at least 2 threads in pool to avoid deadlock on clearing.
+ cfg.setRebalanceThreadPoolSize(ThreadLocalRandom.current().nextInt(3)
+ 2);
cfg.setConsistentId(igniteInstanceName);
DataStorageConfiguration dsCfg = new
DataStorageConfiguration().setWalSegmentSize(4 * 1024 * 1024);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PreloadingRestartWhileClearingPartitionTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PreloadingRestartWhileClearingPartitionTest.java
index 0be3c6b..c318d46 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PreloadingRestartWhileClearingPartitionTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PreloadingRestartWhileClearingPartitionTest.java
@@ -52,7 +52,7 @@ public class PreloadingRestartWhileClearingPartitionTest
extends GridCommonAbstr
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
- cfg.setRebalanceThreadPoolSize(ThreadLocalRandom.current().nextInt(4)
+ 1);
+ cfg.setRebalanceThreadPoolSize(ThreadLocalRandom.current().nextInt(3)
+ 2);
cfg.setConsistentId(igniteInstanceName);
DataStorageConfiguration dsCfg = new
DataStorageConfiguration().setWalSegmentSize(4 * 1024 * 1024);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/RentingPartitionIsOwnedDuringEvictionTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/RentingPartitionIsOwnedDuringEvictionTest.java
index 4172054..675896b 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/RentingPartitionIsOwnedDuringEvictionTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/RentingPartitionIsOwnedDuringEvictionTest.java
@@ -55,7 +55,7 @@ public class RentingPartitionIsOwnedDuringEvictionTest
extends GridCommonAbstrac
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
- cfg.setRebalanceThreadPoolSize(ThreadLocalRandom.current().nextInt(4)
+ 1);
+ cfg.setRebalanceThreadPoolSize(ThreadLocalRandom.current().nextInt(3)
+ 2);
cfg.setConsistentId(igniteInstanceName);
if (persistence) {
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
index 5d2ba60..3b942fb 100755
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java
@@ -47,6 +47,7 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtP
import
org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtPreloadWaitForBackupsWithPersistenceTest;
import
org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheStartWithLoadTest;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.BlockedEvictionsTest;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.DelayedOwningDuringExchangeTest;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.EvictionWhilePartitionGroupIsReservedTest;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.MovingPartitionIsEvictedDuringClearingTest;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.PreloadingRestartWhileClearingPartitionTest;
@@ -157,6 +158,7 @@ public class IgniteCacheTestSuite7 {
GridTestUtils.addTestIfNeeded(suite,
PreloadingRestartWhileClearingPartitionTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
MovingPartitionIsEvictedDuringClearingTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
EvictionWhilePartitionGroupIsReservedTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite,
DelayedOwningDuringExchangeTest.class, ignoredTests);
return suite;
}