This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 0bbb53af88c IGNITE-20174 CacheDistributedGetFutureAdapter and it's
descendants initial cleanup (#10881)
0bbb53af88c is described below
commit 0bbb53af88c29a3af39fed8dddce378e5700ac79
Author: Anton Vinogradov <[email protected]>
AuthorDate: Wed Aug 9 13:01:10 2023 +0300
IGNITE-20174 CacheDistributedGetFutureAdapter and it's descendants initial
cleanup (#10881)
---
.../dht/CacheDistributedGetFutureAdapter.java | 73 ++++++++--------------
.../distributed/dht/GridPartitionedGetFuture.java | 33 ++++------
.../cache/distributed/near/GridNearGetFuture.java | 28 ++++-----
3 files changed, 48 insertions(+), 86 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
index 4abecc94547..9039b173e4a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -45,10 +45,8 @@ import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetR
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -75,10 +73,10 @@ public abstract class CacheDistributedGetFutureAdapter<K, V>
public static final int DFLT_MAX_REMAP_CNT = 3;
/** Maximum number of attempts to remap key to the same primary node. */
- protected static final int MAX_REMAP_CNT =
getInteger(IGNITE_NEAR_GET_MAX_REMAPS, DFLT_MAX_REMAP_CNT);
+ private static final int MAX_REMAP_CNT =
getInteger(IGNITE_NEAR_GET_MAX_REMAPS, DFLT_MAX_REMAP_CNT);
/** Remap count updater. */
- protected static final
AtomicIntegerFieldUpdater<CacheDistributedGetFutureAdapter> REMAP_CNT_UPD =
+ private static final
AtomicIntegerFieldUpdater<CacheDistributedGetFutureAdapter> REMAP_CNT_UPD =
AtomicIntegerFieldUpdater.newUpdater(CacheDistributedGetFutureAdapter.class,
"remapCnt");
/** Context. */
@@ -159,7 +157,7 @@ public abstract class CacheDistributedGetFutureAdapter<K, V>
boolean keepCacheObjects,
boolean recovery
) {
- super(CU.<K, V>mapsReducer(keys.size()));
+ super(CU.mapsReducer(keys.size()));
assert !F.isEmpty(keys);
@@ -174,7 +172,7 @@ public abstract class CacheDistributedGetFutureAdapter<K, V>
this.needVer = needVer;
this.keepCacheObjects = keepCacheObjects;
this.recovery = recovery;
- this.deploymentLdrId =
U.contextDeploymentClassLoaderId(cctx.kernalContext());
+ deploymentLdrId =
U.contextDeploymentClassLoaderId(cctx.kernalContext());
futId = IgniteUuid.randomUuid();
}
@@ -237,15 +235,9 @@ public abstract class CacheDistributedGetFutureAdapter<K,
V>
if (invalidNodes == Collections.<AffinityTopologyVersion, Map<Integer,
Set<ClusterNode>>>emptyMap())
invalidNodes = new HashMap<>();
- Map<Integer, Set<ClusterNode>> invalidNodeMap =
invalidNodes.get(topVer);
+ Map<Integer, Set<ClusterNode>> invalidNodeMap =
invalidNodes.computeIfAbsent(topVer, k -> new HashMap<>());
- if (invalidNodeMap == null)
- invalidNodes.put(topVer, invalidNodeMap = new HashMap<>());
-
- Set<ClusterNode> invalidNodeSet = invalidNodeMap.get(part);
-
- if (invalidNodeSet == null)
- invalidNodeMap.put(part, invalidNodeSet = new HashSet<>());
+ Set<ClusterNode> invalidNodeSet = invalidNodeMap.computeIfAbsent(part,
k -> new HashSet<>());
invalidNodeSet.add(node);
}
@@ -307,7 +299,7 @@ public abstract class CacheDistributedGetFutureAdapter<K, V>
if (f.node().id().equals(nodeId)) {
found = true;
- f.onNodeLeft(new ClusterTopologyCheckedException("Remote
node left grid (will retry): " + nodeId));
+ f.onNodeLeft();
}
}
@@ -358,18 +350,16 @@ public abstract class CacheDistributedGetFutureAdapter<K,
V>
/** {@inheritDoc} */
@Override public String toString() {
- Collection<String> futuresStrings = F.viewReadOnly(futures(), new
C1<IgniteInternalFuture<?>, String>() {
- @Override public String apply(IgniteInternalFuture<?> f) {
- if (isMini(f)) {
- AbstractMiniFuture mini = (AbstractMiniFuture)f;
-
- return "miniFuture([futId=" + mini.futureId() + ", node="
+ mini.node().id() +
- ", loc=" + mini.node().isLocal() +
- ", done=" + f.isDone() + "])";
- }
- else
- return f.getClass().getSimpleName() + " [loc=true, done="
+ f.isDone() + "]";
+ Collection<String> futuresStrings = F.viewReadOnly(futures(),
(IgniteInternalFuture<?> f) -> {
+ if (isMini(f)) {
+ AbstractMiniFuture mini = (AbstractMiniFuture)f;
+
+ return "miniFuture([futId=" + mini.futureId() + ", node=" +
mini.node().id() +
+ ", loc=" + mini.node().isLocal() +
+ ", done=" + f.isDone() + "])";
}
+ else
+ return f.getClass().getSimpleName() + " [loc=true, done=" +
f.isDone() + "]";
});
return S.toString(CacheDistributedGetFutureAdapter.class, this,
@@ -414,7 +404,7 @@ public abstract class CacheDistributedGetFutureAdapter<K, V>
this.node = node;
this.keys = keys;
this.topVer = topVer;
- this.postProcessingClos = CU.createBackupPostProcessingClosure(
+ postProcessingClos = CU.createBackupPostProcessingClosure(
topVer, log, cctx, null, expiryPlc, readThrough &&
cctx.readThroughConfigured(), skipVals);
}
@@ -432,13 +422,6 @@ public abstract class CacheDistributedGetFutureAdapter<K,
V>
return node;
}
- /**
- * @return Keys.
- */
- public Collection<KeyCacheObject> keys() {
- return keys.keySet();
- }
-
/**
* Factory methond for generate request associated with this
miniFuture.
*
@@ -474,9 +457,9 @@ public abstract class CacheDistributedGetFutureAdapter<K, V>
}
/**
- * @param e Failure exception.
+ *
*/
- public synchronized void onNodeLeft(ClusterTopologyCheckedException e)
{
+ public synchronized void onNodeLeft() {
if (remapped)
return;
@@ -489,7 +472,7 @@ public abstract class CacheDistributedGetFutureAdapter<K, V>
if (!canRemap) {
map(keys.keySet(), F.t(node, keys), topVer);
- onDone(Collections.<K, V>emptyMap());
+ onDone(Collections.emptyMap());
}
else {
long maxTopVer = Math.max(topVer.topologyVersion() + 1,
cctx.discovery().topologyVersion());
@@ -503,7 +486,7 @@ public abstract class CacheDistributedGetFutureAdapter<K, V>
// Remap.
map(keys.keySet(), F.t(node, keys), f.get());
- onDone(Collections.<K, V>emptyMap());
+ onDone(Collections.emptyMap());
}
catch (IgniteCheckedException ex) {
CacheDistributedGetFutureAdapter.this.onDone(ex);
@@ -536,11 +519,8 @@ public abstract class CacheDistributedGetFutureAdapter<K,
V>
log.debug("Remapping mini get future [invalidParts=" +
invalidParts + ", fut=" + this + ']');
if (!canRemap) {
- map(F.view(keys.keySet(), new P1<KeyCacheObject>() {
- @Override public boolean apply(KeyCacheObject key) {
- return
invalidParts.contains(cctx.affinity().partition(key));
- }
- }), F.t(node, keys), topVer);
+ map(F.view(keys.keySet(), (KeyCacheObject key) ->
invalidParts.contains(cctx.affinity().partition(key))),
+ F.t(node, keys), topVer);
postProcessResult(res);
@@ -558,11 +538,8 @@ public abstract class CacheDistributedGetFutureAdapter<K,
V>
AffinityTopologyVersion topVer = fut.get();
// This will append new futures to compound list.
- map(F.view(keys.keySet(), new P1<KeyCacheObject>()
{
- @Override public boolean apply(KeyCacheObject
key) {
- return
invalidParts.contains(cctx.affinity().partition(key));
- }
- }), F.t(node, keys), topVer);
+ map(F.view(keys.keySet(), (KeyCacheObject key) ->
invalidParts.contains(cctx.affinity().partition(key))),
+ F.t(node, keys), topVer);
postProcessResult(res);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index d0756437c09..acc22d8b865 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -47,12 +47,10 @@ import
org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridLeanMap;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -61,13 +59,13 @@ import org.jetbrains.annotations.Nullable;
*/
public class GridPartitionedGetFuture<K, V> extends
CacheDistributedGetFutureAdapter<K, V> {
/** Transaction label. */
- protected final String txLbl;
+ private final String txLbl;
/** */
- protected final MvccSnapshot mvccSnapshot;
+ private final MvccSnapshot mvccSnapshot;
/** Explicit predefined single mapping (backup or primary). */
- protected final ClusterNode affNode;
+ private final ClusterNode affNode;
/**
* @param cctx Context.
@@ -194,18 +192,11 @@ public class GridPartitionedGetFuture<K, V> extends
CacheDistributedGetFutureAda
if (fut.initialVersion().after(topVer) || (fut.exchangeActions()
!= null && fut.exchangeActions().hasStop()))
fut = cctx.shared().exchange().lastFinishedFuture();
else {
- fut.listen(new
IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void
apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
- if (fut.error() != null)
- onDone(fut.error());
- else {
- cctx.closures().runLocalSafe(new
GridPlainRunnable() {
- @Override public void run() {
- map(keys, mapped, topVer);
- }
- }, true);
- }
- }
+ fut.listen((IgniteInternalFuture<AffinityTopologyVersion>
fut0) -> {
+ if (fut0.error() != null)
+ onDone(fut0.error());
+ else
+ cctx.closures().runLocalSafe(() -> map(keys, mapped,
topVer), true);
});
return;
@@ -322,7 +313,7 @@ public class GridPartitionedGetFuture<K, V> extends
CacheDistributedGetFutureAda
catch (IgniteCheckedException e) {
// Fail the whole thing.
if (e instanceof ClusterTopologyCheckedException)
- miniFut.onNodeLeft((ClusterTopologyCheckedException)e);
+ miniFut.onNodeLeft();
else
miniFut.onResult(e);
}
@@ -409,10 +400,8 @@ public class GridPartitionedGetFuture<K, V> extends
CacheDistributedGetFutureAda
ClusterNode node,
Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings
) {
- LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(node);
-
- if (old == null)
- mappings.put(node, old = new LinkedHashMap<>(3, 1f));
+ LinkedHashMap<KeyCacheObject, Boolean> old =
+ mappings.computeIfAbsent(node, k -> new LinkedHashMap<>(3, 1f));
old.put(key, false);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 11159bbc278..e363f695c81 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -285,11 +285,11 @@ public final class GridNearGetFuture<K, V> extends
CacheDistributedGetFutureAdap
else {
registrateFutureInMvccManager(this);
- MiniFuture miniFuture = new MiniFuture(n, mappedKeys, saved,
topVer);
+ MiniFuture miniFut = new MiniFuture(n, mappedKeys, saved,
topVer);
- GridNearGetRequest req = miniFuture.createGetRequest(futId);
+ GridNearGetRequest req = miniFut.createGetRequest(futId);
- add(miniFuture); // Append new future.
+ add(miniFut); // Append new future.
try {
cctx.io().send(n, req, cctx.ioPolicy());
@@ -297,9 +297,9 @@ public final class GridNearGetFuture<K, V> extends
CacheDistributedGetFutureAdap
catch (IgniteCheckedException e) {
// Fail the whole thing.
if (e instanceof ClusterTopologyCheckedException)
-
miniFuture.onNodeLeft((ClusterTopologyCheckedException)e);
+ miniFut.onNodeLeft();
else
- miniFuture.onResult(e);
+ miniFut.onResult(e);
}
}
}
@@ -431,10 +431,8 @@ public final class GridNearGetFuture<K, V> extends
CacheDistributedGetFutureAdap
if (!addRdr && tx.readCommitted() &&
!tx.writeSet().contains(cctx.txKey(key)))
addRdr = true;
- LinkedHashMap<KeyCacheObject, Boolean> old =
mappings.get(affNode);
-
- if (old == null)
- mappings.put(affNode, old = new LinkedHashMap<>(3,
1f));
+ LinkedHashMap<KeyCacheObject, Boolean> old =
+ mappings.computeIfAbsent(affNode, k -> new
LinkedHashMap<>(3, 1f));
old.put(key, addRdr);
}
@@ -581,8 +579,8 @@ public final class GridNearGetFuture<K, V> extends
CacheDistributedGetFutureAdap
K key0 = (K)cctx.unwrapBinaryIfNeeded(key, !deserializeBinary,
false, null);
V val0 = needVer ?
(V)new EntryGetResult(!skipVals ?
- (V)cctx.unwrapBinaryIfNeeded(v, !deserializeBinary, false,
null) :
- (V)Boolean.TRUE, ver) :
+ cctx.unwrapBinaryIfNeeded(v, !deserializeBinary, false,
null) :
+ Boolean.TRUE, ver) :
!skipVals ?
(V)cctx.unwrapBinaryIfNeeded(v, !deserializeBinary, false,
null) :
(V)Boolean.TRUE;
@@ -622,7 +620,7 @@ public final class GridNearGetFuture<K, V> extends
CacheDistributedGetFutureAdap
) {
boolean empty = F.isEmpty(keys);
- Map<K, V> map = empty ? Collections.<K, V>emptyMap() : new
GridLeanMap<K, V>(keys.size());
+ Map<K, V> map = empty ? Collections.emptyMap() : new
GridLeanMap<>(keys.size());
if (!empty) {
boolean atomic = cctx.atomic();
@@ -689,12 +687,10 @@ public final class GridNearGetFuture<K, V> extends
CacheDistributedGetFutureAdap
/**
* @param keys Keys.
* @param saved Saved entries.
- * @param topVer Topology version.
*/
private void releaseEvictions(
Collection<KeyCacheObject> keys,
- Map<KeyCacheObject, GridNearCacheEntry> saved,
- AffinityTopologyVersion topVer
+ Map<KeyCacheObject, GridNearCacheEntry> saved
) {
for (KeyCacheObject key : keys) {
GridNearCacheEntry entry = saved.get(key);
@@ -767,7 +763,7 @@ public final class GridNearGetFuture<K, V> extends
CacheDistributedGetFutureAdap
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable Map<K, V> res, @Nullable
Throwable err) {
if (super.onDone(res, err)) {
- releaseEvictions(keys.keySet(), savedEntries, topVer);
+ releaseEvictions(keys.keySet(), savedEntries);
return true;
}