Repository: ignite Updated Branches: refs/heads/ignite-perftest-merge e58604a4a -> 54f943462
Ignite-perftest - Fixed compilation and backward compatibility. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/54f94346 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/54f94346 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/54f94346 Branch: refs/heads/ignite-perftest-merge Commit: 54f943462bf714787e229c0f8663164c560902cf Parents: e58604a Author: Alexey Goncharuk <[email protected]> Authored: Fri Nov 13 15:28:45 2015 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Fri Nov 13 15:28:45 2015 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 49 ++++++++++++++++++++ .../GridDistributedTxPrepareRequest.java | 10 ++-- .../dht/GridDhtTransactionalCacheAdapter.java | 2 +- .../dht/colocated/GridDhtColocatedCache.java | 2 +- 4 files changed, 58 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/54f94346/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 81ff028..4c4074e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -30,6 +30,7 @@ import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -66,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.GridListSet; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.IgnitePair; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; @@ -77,6 +79,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; @@ -133,6 +136,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** */ private final ConcurrentMap<AffinityTopologyVersion, AffinityReadyFuture> readyFuts = new ConcurrentHashMap8<>(); + /** */ + private final ConcurrentSkipListMap<AffinityTopologyVersion, IgnitePair<IgniteProductVersion>> nodeVers = + new ConcurrentSkipListMap<>(); + /** */ private final AtomicReference<AffinityTopologyVersion> readyTopVer = new AtomicReference<>(AffinityTopologyVersion.NONE); @@ -572,6 +579,30 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** + * Gets minimum node version for the given topology version. + * + * @param topVer Topology version to get minimum node version for. + * @return Minimum node version. + */ + public IgniteProductVersion minimumNodeVersion(AffinityTopologyVersion topVer) { + IgnitePair<IgniteProductVersion> vers = nodeVers.get(topVer); + + return vers == null ? cctx.localNode().version() : vers.get1(); + } + + /** + * Gets maximum node version for the given topology version. + * + * @param topVer Topology version to get maximum node version for. + * @return Maximum node version. + */ + public IgniteProductVersion maximumNodeVersion(AffinityTopologyVersion topVer) { + IgnitePair<IgniteProductVersion> vers = nodeVers.get(topVer); + + return vers == null ? cctx.localNode().version() : vers.get2(); + } + + /** * @return {@code true} if entered to busy state. */ private boolean enterBusy() { @@ -832,6 +863,24 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (log.isDebugEnabled()) log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + ", err=" + err + ']'); + IgniteProductVersion minVer = cctx.localNode().version(); + IgniteProductVersion maxVer = cctx.localNode().version(); + + for (ClusterNode node : exchFut.discoveryEvent().topologyNodes()) { + IgniteProductVersion ver = node.version(); + + if (ver.compareTo(minVer) < 0) + minVer = ver; + + if (ver.compareTo(maxVer) > 0) + maxVer = ver; + } + + nodeVers.put(topVer, new IgnitePair<>(minVer, maxVer)); + + for (AffinityTopologyVersion oldVer : nodeVers.headMap(new AffinityTopologyVersion(topVer.topologyVersion() - 10, 0)).keySet()) + nodeVers.remove(oldVer); + if (err == null) { while (true) { AffinityTopologyVersion readyVer = readyTopVer.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/54f94346/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java index ba251e4..abd6818 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -56,6 +57,9 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage /** */ private static final long serialVersionUID = 0L; + /** Version in which direct marshalling of tx nodes was introduced. */ + public static final IgniteProductVersion TX_NODES_DIRECT_MARSHALLABLE_SINCE = IgniteProductVersion.fromString("1.5.0"); + /** Collection to message converter. */ public static final C1<Collection<UUID>, UUIDCollectionMessage> COL_TO_MSG = new C1<Collection<UUID>, UUIDCollectionMessage>() { @Override public UUIDCollectionMessage apply(Collection<UUID> uuids) { @@ -327,9 +331,9 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage if (txNodesMsg == null) txNodesMsg = F.viewReadOnly(txNodes, COL_TO_MSG); - // TODO backward compatibility. -// if (txNodes != null) -// txNodesBytes = ctx.marshaller().marshal(txNodes); + // Marshal txNodes only if there is a node in topology with an older version. + if (txNodes != null && ctx.exchange().minimumNodeVersion(topologyVersion()).compareTo(TX_NODES_DIRECT_MARSHALLABLE_SINCE) < 0) + txNodesBytes = ctx.marshaller().marshal(txNodes); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/54f94346/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 3069afd..1ba1c0d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -672,7 +672,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (log.isDebugEnabled()) log.debug("Got removed entry when adding lock (will retry): " + entry); } - catch (IgniteCheckedException | GridDistributedLockCancelledException e) { + catch (GridDistributedLockCancelledException e) { if (log.isDebugEnabled()) log.debug("Failed to add entry [err=" + e + ", entry=" + entry + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/54f94346/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 83c220d..7131aa5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -873,7 +873,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (log.isDebugEnabled()) log.debug("Got removed entry when adding lock (will retry): " + entry); } - catch (IgniteCheckedException | GridDistributedLockCancelledException e) { + catch (GridDistributedLockCancelledException e) { if (log.isDebugEnabled()) log.debug("Failed to add entry [err=" + e + ", entry=" + entry + ']');
