ignite-3479
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e0196b00 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e0196b00 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e0196b00 Branch: refs/heads/ignite-3479 Commit: e0196b003f628eeaf540ebec9760bc6dcafaee4c Parents: 4e7f19e Author: sboikov <[email protected]> Authored: Wed Sep 27 17:52:18 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Sep 27 17:58:34 2017 +0300 ---------------------------------------------------------------------- .../ignite/internal/GridKernalContext.java | 4 +- .../ignite/internal/GridKernalContextImpl.java | 10 +- .../apache/ignite/internal/IgniteKernal.java | 4 +- .../processors/cache/GridCacheProcessor.java | 1 - .../cache/GridCacheSharedContext.java | 4 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 2 - .../GridNearPessimisticTxPrepareFuture.java | 8 +- .../mvcc/CacheCoordinatorsDiscoveryData.java | 42 + .../cache/mvcc/CacheCoordinatorsProcessor.java | 1120 ++++++++++++++++++ .../mvcc/CacheCoordinatorsSharedManager.java | 1100 ----------------- .../wal/reader/StandaloneGridKernalContext.java | 4 +- .../cache/tree/AbstractDataInnerIO.java | 6 +- .../cache/tree/AbstractDataLeafIO.java | 6 +- .../processors/cache/tree/CacheDataTree.java | 4 +- .../cache/tree/CacheIdAwareDataInnerIO.java | 4 +- .../cache/tree/CacheIdAwareDataLeafIO.java | 4 +- .../processors/cache/tree/DataInnerIO.java | 4 +- .../processors/cache/tree/DataLeafIO.java | 4 +- .../processors/cache/tree/MvccDataRow.java | 4 +- .../processors/cache/tree/SearchRow.java | 4 +- .../cache/mvcc/CacheMvccTransactionsTest.java | 4 +- 21 files changed, 1201 insertions(+), 1142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e0196b00/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 971be7e..88251aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -34,7 +34,7 @@ import org.apache.ignite.internal.managers.indexing.GridIndexingManager; import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager; import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor; import org.apache.ignite.internal.processors.cache.GridCacheProcessor; -import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; import org.apache.ignite.internal.processors.cluster.ClusterProcessor; @@ -648,5 +648,5 @@ public interface GridKernalContext extends Iterable<GridComponent> { /** * @return Cache mvcc coordinator processor. */ - public CacheCoordinatorsSharedManager coordinators(); + public CacheCoordinatorsProcessor coordinators(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e0196b00/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 1715887..86c0adc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -49,7 +49,7 @@ import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor; import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager; import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; -import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; import org.apache.ignite.internal.processors.cluster.ClusterProcessor; @@ -285,7 +285,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** Cache mvcc coordinators. */ @GridToStringExclude - private CacheCoordinatorsSharedManager coordProc; + private CacheCoordinatorsProcessor coordProc; /** */ @GridToStringExclude @@ -584,8 +584,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable poolProc = (PoolProcessor) comp; else if (comp instanceof GridMarshallerMappingProcessor) mappingProc = (GridMarshallerMappingProcessor)comp; - else if (comp instanceof CacheCoordinatorsSharedManager) - coordProc = (CacheCoordinatorsSharedManager)comp; + else if (comp instanceof CacheCoordinatorsProcessor) + coordProc = (CacheCoordinatorsProcessor)comp; else if (!(comp instanceof DiscoveryNodeValidationProcessor || comp instanceof PlatformPluginProcessor)) assert (comp instanceof GridPluginComponent) : "Unknown manager class: " + comp.getClass(); @@ -841,7 +841,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ - @Override public CacheCoordinatorsSharedManager coordinators() { + @Override public CacheCoordinatorsProcessor coordinators() { return coordProc; } http://git-wip-us.apache.org/repos/asf/ignite/blob/e0196b00/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 7b833bc..9a6972b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -114,7 +114,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheUtilityKey; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; -import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; @@ -938,7 +938,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { // Start processors before discovery manager, so they will // be able to start receiving messages once discovery completes. try { - startProcessor(new CacheCoordinatorsSharedManager(ctx)); + startProcessor(new CacheCoordinatorsProcessor(ctx)); startProcessor(createComponent(DiscoveryNodeValidationProcessor.class, ctx)); startProcessor(new GridAffinityProcessor(ctx)); startProcessor(createComponent(GridSegmentationProcessor.class, ctx)); http://git-wip-us.apache.org/repos/asf/ignite/blob/e0196b00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index e52c56c..2af7fd8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -86,7 +86,6 @@ import org.apache.ignite.internal.processors.cache.dr.GridCacheDrManager; import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter; import org.apache.ignite.internal.processors.cache.local.GridLocalCache; import org.apache.ignite.internal.processors.cache.local.atomic.GridLocalAtomicCache; -import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy; http://git-wip-us.apache.org/repos/asf/ignite/blob/e0196b00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 1cdee39..f4e4d48 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -44,7 +44,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter; -import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager; import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; @@ -773,7 +773,7 @@ public class GridCacheSharedContext<K, V> { /** * @return Cache mvcc coordinator manager. */ - public CacheCoordinatorsSharedManager coordinators() { + public CacheCoordinatorsProcessor coordinators() { return kernalCtx.coordinators(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e0196b00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index cee5d9b..e4a7141 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -37,8 +37,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; -import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; -import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter; http://git-wip-us.apache.org/repos/asf/ignite/blob/e0196b00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index c6192d9..0664b1a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -36,7 +36,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping; -import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.mvcc.MvccResponseListener; @@ -490,9 +490,9 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA ", loc=" + ((MiniFuture)f).primary().isLocal() + ", done=" + f.isDone() + "]"; } - else if (f instanceof CacheCoordinatorsSharedManager.MvccVersionFuture) { - CacheCoordinatorsSharedManager.MvccVersionFuture crdFut = - (CacheCoordinatorsSharedManager.MvccVersionFuture)f; + else if (f instanceof CacheCoordinatorsProcessor.MvccVersionFuture) { + CacheCoordinatorsProcessor.MvccVersionFuture crdFut = + (CacheCoordinatorsProcessor.MvccVersionFuture)f; return "[mvccCrdNode=" + crdFut.crdId + ", loc=" + crdFut.crdId.equals(cctx.localNodeId()) + http://git-wip-us.apache.org/repos/asf/ignite/blob/e0196b00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsDiscoveryData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsDiscoveryData.java new file mode 100644 index 0000000..39baec9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsDiscoveryData.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.io.Serializable; + +/** + * + */ +public class CacheCoordinatorsDiscoveryData implements Serializable { + /** */ + private MvccCoordinator crd; + + /** + * @param crd Coordinator. + */ + public CacheCoordinatorsDiscoveryData(MvccCoordinator crd) { + this.crd = crd; + } + + /** + * @return Current coordinator. + */ + public MvccCoordinator coordinator() { + return crd; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e0196b00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java new file mode 100644 index 0000000..5f5da20 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java @@ -0,0 +1,1120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.Event; +import org.apache.ignite.internal.GridComponent; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.GridTopic; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.communication.GridMessageListener; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.GridAtomicLong; +import org.apache.ignite.internal.util.GridLongList; +import org.apache.ignite.internal.util.future.GridCompoundFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.discovery.DiscoveryDataBag; +import org.jetbrains.annotations.Nullable; +import org.jsr166.LongAdder8; + +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; + +/** + * + */ +public class CacheCoordinatorsProcessor extends GridProcessorAdapter { + /** */ + public static final long COUNTER_NA = 0L; + + /** */ + private static final boolean STAT_CNTRS = false; + + /** */ + private static final GridTopic MSG_TOPIC = TOPIC_CACHE_COORDINATOR; + + /** */ + private static final byte MSG_POLICY = SYSTEM_POOL; + + /** */ + private volatile MvccCoordinator curCrd; + + /** */ + private final AtomicLong mvccCntr = new AtomicLong(1L); + + /** */ + private final GridAtomicLong committedCntr = new GridAtomicLong(1L); + + /** */ + private final ConcurrentSkipListMap<Long, GridCacheVersion> activeTxs = new ConcurrentSkipListMap<>(); + + /** */ + private final ConcurrentMap<Long, AtomicInteger> activeQueries = new ConcurrentHashMap<>(); + + /** */ + private final PreviousCoordinatorQueries prevCrdQueries = new PreviousCoordinatorQueries(); + + /** */ + private final ConcurrentMap<Long, MvccVersionFuture> verFuts = new ConcurrentHashMap<>(); + + /** */ + private final ConcurrentMap<Long, WaitAckFuture> ackFuts = new ConcurrentHashMap<>(); + + /** */ + private ConcurrentMap<Long, WaitTxFuture> waitTxFuts = new ConcurrentHashMap<>(); + + /** */ + private final AtomicLong futIdCntr = new AtomicLong(); + + /** */ + private final CountDownLatch crdLatch = new CountDownLatch(1); + + /** Topology version when local node was assigned as coordinator. */ + private long crdVer; + + /** */ + private StatCounter[] statCntrs; + + /** + * @param ver1 First version. + * @param ver2 Second version. + * @return Comparison result. + */ + public static int compareVersions(MvccCoordinatorVersion ver1, MvccCoordinatorVersion ver2) { + assert ver1 != null; + assert ver2 != null; + + int cmp = Long.compare(ver1.coordinatorVersion(), ver2.coordinatorVersion()); + + if (cmp != 0) + return cmp; + + return Long.compare(ver1.counter(), ver2.counter()); + } + + /** */ + private CacheCoordinatorsDiscoveryData discoData; + + /** + * @param ctx Context. + */ + public CacheCoordinatorsProcessor(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public DiscoveryDataExchangeType discoveryDataType() { + return DiscoveryDataExchangeType.CACHE_CRD_PROC; + } + + /** {@inheritDoc} */ + @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { + Integer cmpId = DiscoveryDataExchangeType.CACHE_CRD_PROC.ordinal(); + + if (!dataBag.commonDataCollectedFor(cmpId)) + dataBag.addGridCommonData(cmpId, discoData); + } + + /** {@inheritDoc} */ + @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { + discoData = (CacheCoordinatorsDiscoveryData)data.commonData(); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + statCntrs = new StatCounter[7]; + + statCntrs[0] = new CounterWithAvg("CoordinatorTxCounterRequest", "avgTxs"); + statCntrs[1] = new CounterWithAvg("MvccCoordinatorVersionResponse", "avgFutTime"); + statCntrs[2] = new StatCounter("CoordinatorTxAckRequest"); + statCntrs[3] = new CounterWithAvg("CoordinatorTxAckResponse", "avgFutTime"); + statCntrs[4] = new StatCounter("TotalRequests"); + statCntrs[5] = new StatCounter("CoordinatorWaitTxsRequest"); + statCntrs[6] = new CounterWithAvg("CoordinatorWaitTxsResponse", "avgFutTime"); + + ctx.event().addLocalEventListener(new CacheCoordinatorNodeFailListener(), + EVT_NODE_FAILED, EVT_NODE_LEFT); + + ctx.io().addMessageListener(MSG_TOPIC, new CoordinatorMessageListener()); + } + + /** + * @param log Logger. + */ + public void dumpStatistics(IgniteLogger log) { + if (STAT_CNTRS) { + log.info("Mvcc coordinator statistics: "); + + for (StatCounter cntr : statCntrs) + cntr.dumpInfo(log); + } + } + + /** + * @param tx Transaction. + * @return Counter. + */ + public MvccCoordinatorVersion requestTxCounterOnCoordinator(IgniteInternalTx tx) { + assert ctx.localNodeId().equals(currentCoordinatorId()); + + return assignTxCounter(tx.nearXidVersion(), 0L); + } + + /** + * @param crd Coordinator. + * @param lsnr Response listener. + * @param txVer Transaction version. + * @return Counter request future. + */ + public IgniteInternalFuture<MvccCoordinatorVersion> requestTxCounter(MvccCoordinator crd, + MvccResponseListener lsnr, + GridCacheVersion txVer) { + assert !ctx.localNodeId().equals(crd.nodeId()); + + MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), + crd.nodeId(), + lsnr); + + verFuts.put(fut.id, fut); + + try { + ctx.io().sendToGridTopic(crd.nodeId(), + MSG_TOPIC, + new CoordinatorTxCounterRequest(fut.id, txVer), + MSG_POLICY); + } + catch (IgniteCheckedException e) { + fut.onError(e); + } + + return fut; + } + + /** + * @param crd Coordinator. + * @param mvccVer Query version. + */ + public void ackQueryDone(MvccCoordinator crd, MvccCoordinatorVersion mvccVer) { + assert crd != null; + + long trackCntr = mvccVer.counter(); + + MvccLongList txs = mvccVer.activeTransactions(); + + if (txs != null) { + for (int i = 0; i < txs.size(); i++) { + long txId = txs.get(i); + + if (txId < trackCntr) + trackCntr = txId; + } + } + + Message msg = crd.coordinatorVersion() == mvccVer.coordinatorVersion() ? new CoordinatorQueryAckRequest(trackCntr) : + new NewCoordinatorQueryAckRequest(mvccVer.coordinatorVersion(), trackCntr); + + try { + ctx.io().sendToGridTopic(crd.nodeId(), + MSG_TOPIC, + msg, + MSG_POLICY); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send query ack, node left [crd=" + crd + ", msg=" + msg + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send query ack [crd=" + crd + ", msg=" + msg + ']', e); + } + } + + /** + * @param crd Coordinator. + * @return Counter request future. + */ + public IgniteInternalFuture<MvccCoordinatorVersion> requestQueryCounter(MvccCoordinator crd) { + assert crd != null; + + // TODO IGNITE-3478: special case for local? + MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), crd.nodeId(), null); + + verFuts.put(fut.id, fut); + + try { + ctx.io().sendToGridTopic(crd.nodeId(), + MSG_TOPIC, + new CoordinatorQueryVersionRequest(fut.id), + MSG_POLICY); + } + catch (IgniteCheckedException e) { + if (verFuts.remove(fut.id) != null) + fut.onDone(e); + } + + return fut; + } + + /** + * @param crdId Coordinator ID. + * @param txs Transaction IDs. + * @return Future. + */ + public IgniteInternalFuture<Void> waitTxsFuture(UUID crdId, GridLongList txs) { + assert crdId != null; + assert txs != null && txs.size() > 0; + + // TODO IGNITE-3478: special case for local? + + WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), crdId, false); + + ackFuts.put(fut.id, fut); + + try { + ctx.io().sendToGridTopic(crdId, + MSG_TOPIC, + new CoordinatorWaitTxsRequest(fut.id, txs), + MSG_POLICY); + } + catch (ClusterTopologyCheckedException e) { + if (ackFuts.remove(fut.id) != null) + fut.onDone(); // No need to ack, finish without error. + } + catch (IgniteCheckedException e) { + if (ackFuts.remove(fut.id) != null) + fut.onDone(e); + } + + return fut; + } + + /** + * @param crd Coordinator. + * @param mvccVer Transaction version. + * @return Acknowledge future. + */ + public IgniteInternalFuture<Void> ackTxCommit(UUID crd, MvccCoordinatorVersion mvccVer) { + assert crd != null; + assert mvccVer != null; + + WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), crd, true); + + ackFuts.put(fut.id, fut); + + try { + ctx.io().sendToGridTopic(crd, + MSG_TOPIC, + new CoordinatorTxAckRequest(fut.id, mvccVer.counter()), + MSG_POLICY); + } + catch (ClusterTopologyCheckedException e) { + if (ackFuts.remove(fut.id) != null) + fut.onDone(); // No need to ack, finish without error. + } + catch (IgniteCheckedException e) { + if (ackFuts.remove(fut.id) != null) + fut.onDone(e); + } + + return fut; + } + + /** + * @param crdId Coordinator node ID. + * @param mvccVer Transaction version. + */ + public void ackTxRollback(UUID crdId, MvccCoordinatorVersion mvccVer) { + CoordinatorTxAckRequest msg = new CoordinatorTxAckRequest(0, mvccVer.counter()); + + msg.skipResponse(true); + + try { + ctx.io().sendToGridTopic(crdId, + MSG_TOPIC, + msg, + MSG_POLICY); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send tx rollback ack, node left [msg=" + msg + ", node=" + crdId + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send tx rollback ack [msg=" + msg + ", node=" + crdId + ']', e); + } + } + + /** + * @param nodeId Sender node ID. + * @param msg Message. + */ + private void processCoordinatorTxCounterRequest(UUID nodeId, CoordinatorTxCounterRequest msg) { + ClusterNode node = ctx.discovery().node(nodeId); + + if (node == null) { + if (log.isDebugEnabled()) + log.debug("Ignore tx counter request processing, node left [msg=" + msg + ", node=" + nodeId + ']'); + + return; + } + + MvccCoordinatorVersionResponse res = assignTxCounter(msg.txId(), msg.futureId()); + + if (STAT_CNTRS) + statCntrs[0].update(res.size()); + + try { + ctx.io().sendToGridTopic(node, + MSG_TOPIC, + res, + MSG_POLICY); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send tx counter response, node left [msg=" + msg + ", node=" + nodeId + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send tx counter response [msg=" + msg + ", node=" + nodeId + ']', e); + } + } + + /** + * + * @param nodeId Sender node ID. + * @param msg Message. + */ + private void processCoordinatorQueryVersionRequest(UUID nodeId, CoordinatorQueryVersionRequest msg) { + ClusterNode node = ctx.discovery().node(nodeId); + + if (node == null) { + if (log.isDebugEnabled()) + log.debug("Ignore query counter request processing, node left [msg=" + msg + ", node=" + nodeId + ']'); + + return; + } + + MvccCoordinatorVersionResponse res = assignQueryCounter(nodeId, msg.futureId()); + + try { + ctx.io().sendToGridTopic(node, + MSG_TOPIC, + res, + MSG_POLICY); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send query counter response, node left [msg=" + msg + ", node=" + nodeId + ']'); + + onNodeFailed(nodeId); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send query counter response [msg=" + msg + ", node=" + nodeId + ']', e); + + onQueryDone(res.counter()); + } + } + + /** + * @param nodeId Sender node ID. + * @param msg Message. + */ + private void processCoordinatorVersionResponse(UUID nodeId, MvccCoordinatorVersionResponse msg) { + MvccVersionFuture fut = verFuts.remove(msg.futureId()); + + if (fut != null) { + if (STAT_CNTRS) + statCntrs[1].update((System.nanoTime() - fut.startTime) * 1000); + + fut.onResponse(msg); + } + else { + if (ctx.discovery().alive(nodeId)) + U.warn(log, "Failed to find query version future [node=" + nodeId + ", msg=" + msg + ']'); + else if (log.isDebugEnabled()) + log.debug("Failed to find query version future [node=" + nodeId + ", msg=" + msg + ']'); + } + } + + /** + * @param msg Message. + */ + private void processCoordinatorQueryAckRequest(CoordinatorQueryAckRequest msg) { + onQueryDone(msg.counter()); + } + + /** + * @param nodeId Node ID. + * @param msg Message. + */ + private void processNewCoordinatorQueryAckRequest(UUID nodeId, NewCoordinatorQueryAckRequest msg) { + prevCrdQueries.onQueryDone(nodeId, msg); + } + + /** + * @param nodeId Sender node ID. + * @param msg Message. + */ + private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorTxAckRequest msg) { + onTxDone(msg.txCounter()); + + if (STAT_CNTRS) + statCntrs[2].update(); + + if (!msg.skipResponse()) { + try { + ctx.io().sendToGridTopic(nodeId, + MSG_TOPIC, + new CoordinatorFutureResponse(msg.futureId()), + MSG_POLICY); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send tx ack response, node left [msg=" + msg + ", node=" + nodeId + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send tx ack response [msg=" + msg + ", node=" + nodeId + ']', e); + } + } + } + + /** + * @param nodeId Sender node ID. + * @param msg Message. + */ + private void processCoordinatorAckResponse(UUID nodeId, CoordinatorFutureResponse msg) { + WaitAckFuture fut = ackFuts.remove(msg.futureId()); + + if (fut != null) { + if (STAT_CNTRS) { + StatCounter cntr = fut.ackTx ? statCntrs[3] : statCntrs[6]; + + cntr.update((System.nanoTime() - fut.startTime) * 1000); + } + + fut.onResponse(); + } + else { + if (ctx.discovery().alive(nodeId)) + U.warn(log, "Failed to find tx ack future [node=" + nodeId + ", msg=" + msg + ']'); + else if (log.isDebugEnabled()) + log.debug("Failed to find tx ack future [node=" + nodeId + ", msg=" + msg + ']'); + } + } + + /** + * @param txId Transaction ID. + * @return Counter. + */ + private MvccCoordinatorVersionResponse assignTxCounter(GridCacheVersion txId, long futId) { + assert crdVer != 0; + + long nextCtr = mvccCntr.incrementAndGet(); + + // TODO IGNITE-3478 sorted? + change GridLongList.writeTo? + MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse(); + + for (Long txVer : activeTxs.keySet()) + res.addTx(txVer); + + Object old = activeTxs.put(nextCtr, txId); + + assert old == null : txId; + + long cleanupVer; + + if (prevCrdQueries.previousQueriesDone()) { + cleanupVer = committedCntr.get() - 1; + + for (Long qryVer : activeQueries.keySet()) { + if (qryVer <= cleanupVer) + cleanupVer = qryVer - 1; + } + } + else + cleanupVer = -1; + + res.init(futId, crdVer, nextCtr, cleanupVer); + + return res; + } + + /** + * @param txCntr Counter assigned to transaction. + */ + private void onTxDone(Long txCntr) { + GridFutureAdapter fut; // TODO IGNITE-3478. + + GridCacheVersion ver = activeTxs.remove(txCntr); + + assert ver != null; + + committedCntr.setIfGreater(txCntr); + + fut = waitTxFuts.remove(txCntr); + + if (fut != null) + fut.onDone(); + } + + static boolean increment(AtomicInteger cntr) { + for (;;) { + int current = cntr.get(); + + if (current == 0) + return false; + + if (cntr.compareAndSet(current, current + 1)) + return true; + } + } + + /** + * @param qryNodeId Node initiated query. + * @return Counter for query. + */ + private MvccCoordinatorVersionResponse assignQueryCounter(UUID qryNodeId, long futId) { + assert crdVer != 0; + + MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse(); + + Long mvccCntr; + + for(;;) { + mvccCntr = committedCntr.get(); + + Long trackCntr = mvccCntr; + + for (Long txVer : activeTxs.keySet()) { + if (txVer < trackCntr) + trackCntr = txVer; + + res.addTx(txVer); + } + + registerActiveQuery(trackCntr); + + if (committedCntr.get() == mvccCntr) + break; + else { + res.resetTransactionsCount(); + + onQueryDone(trackCntr); + } + } + + res.init(futId, crdVer, mvccCntr, COUNTER_NA); + + return res; + } + + private void registerActiveQuery(Long cntr) { + for (;;) { + AtomicInteger qryCnt = activeQueries.get(cntr); + + if (qryCnt != null) { + boolean inc = increment(qryCnt); + + if (!inc) { + activeQueries.remove(mvccCntr, qryCnt); + + continue; + } + } + else { + qryCnt = new AtomicInteger(1); + + if (activeQueries.putIfAbsent(cntr, qryCnt) != null) + continue; + } + + break; + } + } + + private void onNodeFailed(UUID nodeId) { + // TODO + } + + /** + * @param mvccCntr Query counter. + */ + private void onQueryDone(long mvccCntr) { + AtomicInteger cntr = activeQueries.get(mvccCntr); + + assert cntr != null : mvccCntr; + + int left = cntr.decrementAndGet(); + + assert left >= 0 : left; + + if (left == 0) { + boolean rmv = activeQueries.remove(mvccCntr, cntr); + + assert rmv; + } + } + + /** + * @param msg Message. + */ + private void processCoordinatorWaitTxsRequest(final UUID nodeId, final CoordinatorWaitTxsRequest msg) { + statCntrs[5].update(); + + GridLongList txs = msg.transactions(); + + GridCompoundFuture resFut = null; + + for (int i = 0; i < txs.size(); i++) { + Long txId = txs.get(i); + + WaitTxFuture fut = waitTxFuts.get(txId); + + if (fut == null) { + WaitTxFuture old = waitTxFuts.putIfAbsent(txId, fut = new WaitTxFuture(txId)); + + if (old != null) + fut = old; + } + + if (!activeTxs.containsKey(txId)) + fut.onDone(); + + if (!fut.isDone()) { + if (resFut == null) + resFut = new GridCompoundFuture(); + + resFut.add(fut); + } + } + + if (resFut != null) + resFut.markInitialized(); + + if (resFut == null || resFut.isDone()) + sendFutureResponse(nodeId, msg); + else { + resFut.listen(new IgniteInClosure<IgniteInternalFuture>() { + @Override public void apply(IgniteInternalFuture fut) { + sendFutureResponse(nodeId, msg); + } + }); + } + } + + /** + * @param nodeId + * @param msg + */ + private void sendFutureResponse(UUID nodeId, CoordinatorWaitTxsRequest msg) { + try { + ctx.io().sendToGridTopic(nodeId, + MSG_TOPIC, + new CoordinatorFutureResponse(msg.futureId()), + MSG_POLICY); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send tx ack response, node left [msg=" + msg + ", node=" + nodeId + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send tx ack response [msg=" + msg + ", node=" + nodeId + ']', e); + } + } + + public MvccCoordinator currentCoordinator() { + return curCrd; + } + + public UUID currentCoordinatorId() { + MvccCoordinator curCrd = this.curCrd; + + return curCrd != null ? curCrd.nodeId() : null; + } + + /** + * @param topVer Cache affinity version (used for assert). + * @return Coordinator. + */ + public MvccCoordinator currentCoordinatorForCacheAffinity(AffinityTopologyVersion topVer) { + MvccCoordinator crd = curCrd; + + // Assert coordinator did not already change. + assert crd == null || crd.topologyVersion().compareTo(topVer) <= 0 : + "Invalid coordinator [crd=" + crd + ", topVer=" + topVer + ']'; + + return crd; + } + + /** + * @param discoCache Discovery snapshot. + * @return New coordinator. + */ + public MvccCoordinator reassignCoordinator(DiscoCache discoCache) { + assert curCrd == null || !F.nodeIds(discoCache.allNodes()).contains(curCrd.nodeId()) : curCrd; + + if (!discoCache.serverNodes().isEmpty()) { + ClusterNode node = discoCache.serverNodes().get(0); + + curCrd = new MvccCoordinator(node.id(), + discoCache.version().topologyVersion(), + discoCache.version()); + + log.info("Assigned mvcc coordinator: " + curCrd); + } + else { + curCrd = null; + + log.info("New mvcc coordinator was not assigned [topVer=" + discoCache.version() + ']'); + } + + return curCrd; + } + + /** + * @param nodeId Node ID + * @param activeQueries + */ + public void processClientActiveQueries(UUID nodeId, + @Nullable Map<MvccCounter, Integer> activeQueries) { + prevCrdQueries.processClientActiveQueries(nodeId, activeQueries); + } + + /** + * @param topVer Topology version. + * @param activeQueries Current queries. + */ + public void initCoordinator(AffinityTopologyVersion topVer, + DiscoCache discoCache, + Map<UUID, Map<MvccCounter, Integer>> activeQueries) + { + assert ctx.localNodeId().equals(curCrd.nodeId()); + + log.info("Initialize local node as mvcc coordinator [node=" + ctx.localNodeId() + + ", topVer=" + topVer + ']'); + + crdVer = topVer.topologyVersion(); + + prevCrdQueries.init(activeQueries, discoCache, ctx.discovery()); + + crdLatch.countDown(); + } + + /** + * + */ + public class MvccVersionFuture extends GridFutureAdapter<MvccCoordinatorVersion> { + /** */ + private final Long id; + + /** */ + private MvccResponseListener lsnr; + + /** */ + public final UUID crdId; + + /** */ + long startTime; + + /** + * @param id Future ID. + * @param crdId Coordinator node ID. + */ + MvccVersionFuture(Long id, UUID crdId, @Nullable MvccResponseListener lsnr) { + this.id = id; + this.crdId = crdId; + this.lsnr = lsnr; + + if (STAT_CNTRS) + startTime = System.nanoTime(); + } + + /** + * @param res Response. + */ + void onResponse(MvccCoordinatorVersionResponse res) { + assert res.counter() != COUNTER_NA; + + if (lsnr != null) + lsnr.onMvccResponse(crdId, res); + + onDone(res); + } + + void onError(IgniteCheckedException err) { + if (verFuts.remove(id) != null) { + if (lsnr != null) + lsnr.onMvccError(err); + + onDone(err); + } + } + + /** + * @param nodeId Failed node ID. + */ + void onNodeLeft(UUID nodeId) { + if (crdId.equals(nodeId)) { + ClusterTopologyCheckedException err = new ClusterTopologyCheckedException("Failed to request coordinator version, " + + "coordinator failed: " + nodeId); + + onError(err); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "MvccVersionFuture [crd=" + crdId + ", id=" + id + ']'; + } + } + + /** + * + */ + private class WaitAckFuture extends GridFutureAdapter<Void> { + /** */ + private final long id; + + /** */ + private final UUID crdId; + + /** */ + long startTime; + + /** */ + final boolean ackTx; + + /** + * @param id Future ID. + * @param crdId Coordinator node ID. + */ + WaitAckFuture(long id, UUID crdId, boolean ackTx) { + this.id = id; + this.crdId = crdId; + this.ackTx = ackTx; + + if (STAT_CNTRS) + startTime = System.nanoTime(); + } + + /** + * + */ + void onResponse() { + onDone(); + } + + /** + * @param nodeId Failed node ID. + */ + void onNodeLeft(UUID nodeId) { + if (crdId.equals(nodeId) && verFuts.remove(id) != null) + onDone(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "WaitAckFuture [crdId=" + crdId + ", id=" + id + ']'; + } + } + + /** + * + */ + private class CacheCoordinatorNodeFailListener implements GridLocalEventListener { + /** {@inheritDoc} */ + @Override public void onEvent(Event evt) { + assert evt instanceof DiscoveryEvent : evt; + + DiscoveryEvent discoEvt = (DiscoveryEvent)evt; + + UUID nodeId = discoEvt.eventNode().id(); + + for (MvccVersionFuture fut : verFuts.values()) + fut.onNodeLeft(nodeId); + + for (WaitAckFuture fut : ackFuts.values()) + fut.onNodeLeft(nodeId); + + prevCrdQueries.onNodeLeft(nodeId); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "CacheCoordinatorDiscoveryListener[]"; + } + } + /** + * + */ + private class CoordinatorMessageListener implements GridMessageListener { + /** {@inheritDoc} */ + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { + if (STAT_CNTRS) + statCntrs[4].update(); + + MvccCoordinatorMessage msg0 = (MvccCoordinatorMessage)msg; + + if (msg0.waitForCoordinatorInit()) { + if (crdVer == 0) { + try { + U.await(crdLatch); + } + catch (IgniteInterruptedCheckedException e) { + U.warn(log, "Failed to wait for coordinator initialization, thread interrupted [" + + "msgNode=" + nodeId + ", msg=" + msg + ']'); + + return; + } + + assert crdVer != 0L; + } + } + + if (msg instanceof CoordinatorTxCounterRequest) + processCoordinatorTxCounterRequest(nodeId, (CoordinatorTxCounterRequest)msg); + else if (msg instanceof CoordinatorTxAckRequest) + processCoordinatorTxAckRequest(nodeId, (CoordinatorTxAckRequest)msg); + else if (msg instanceof CoordinatorFutureResponse) + processCoordinatorAckResponse(nodeId, (CoordinatorFutureResponse)msg); + else if (msg instanceof CoordinatorQueryAckRequest) + processCoordinatorQueryAckRequest((CoordinatorQueryAckRequest)msg); + else if (msg instanceof CoordinatorQueryVersionRequest) + processCoordinatorQueryVersionRequest(nodeId, (CoordinatorQueryVersionRequest)msg); + else if (msg instanceof MvccCoordinatorVersionResponse) + processCoordinatorVersionResponse(nodeId, (MvccCoordinatorVersionResponse) msg); + else if (msg instanceof CoordinatorWaitTxsRequest) + processCoordinatorWaitTxsRequest(nodeId, (CoordinatorWaitTxsRequest)msg); + else if (msg instanceof NewCoordinatorQueryAckRequest) + processNewCoordinatorQueryAckRequest(nodeId, (NewCoordinatorQueryAckRequest)msg); + else + U.warn(log, "Unexpected message received [node=" + nodeId + ", msg=" + msg + ']'); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "CoordinatorMessageListener[]"; + } + } + /** + * + */ + static class StatCounter { + /** */ + final String name; + + /** */ + final LongAdder8 cntr = new LongAdder8(); + + public StatCounter(String name) { + this.name = name; + } + + void update() { + cntr.increment(); + } + + void update(GridLongList arg) { + throw new UnsupportedOperationException(); + } + + void update(long arg) { + throw new UnsupportedOperationException(); + } + + void dumpInfo(IgniteLogger log) { + long totalCnt = cntr.sumThenReset(); + + if (totalCnt > 0) + log.info(name + " [cnt=" + totalCnt + ']'); + } + } + + /** + * + */ + static class CounterWithAvg extends StatCounter { + /** */ + final LongAdder8 total = new LongAdder8(); + + /** */ + final String avgName; + + CounterWithAvg(String name, String avgName) { + super(name); + + this.avgName = avgName; + } + + @Override void update(GridLongList arg) { + update(arg != null ? arg.size() : 0); + } + + @Override void update(long add) { + cntr.increment(); + + total.add(add); + } + + void dumpInfo(IgniteLogger log) { + long totalCnt = cntr.sumThenReset(); + long totalSum = total.sumThenReset(); + + if (totalCnt > 0) + log.info(name + " [cnt=" + totalCnt + ", " + avgName + "=" + ((float)totalSum / totalCnt) + ']'); + } + } + + /** + * + */ + private static class WaitTxFuture extends GridFutureAdapter { + /** */ + private final long txId; + + /** + * @param txId Transaction ID. + */ + WaitTxFuture(long txId) { + this.txId = txId; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e0196b00/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java deleted file mode 100644 index 73febc0..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java +++ /dev/null @@ -1,1100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.mvcc; - -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.Event; -import org.apache.ignite.internal.GridComponent; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.GridTopic; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; -import org.apache.ignite.internal.managers.communication.GridMessageListener; -import org.apache.ignite.internal.managers.discovery.DiscoCache; -import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; -import org.apache.ignite.internal.processors.GridProcessorAdapter; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.GridAtomicLong; -import org.apache.ignite.internal.util.GridLongList; -import org.apache.ignite.internal.util.future.GridCompoundFuture; -import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteInClosure; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.jetbrains.annotations.Nullable; -import org.jsr166.LongAdder8; - -import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; -import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; -import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR; -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; - -/** - * - */ -public class CacheCoordinatorsSharedManager extends GridProcessorAdapter { - /** */ - public static final long COUNTER_NA = 0L; - - /** */ - private static final boolean STAT_CNTRS = false; - - /** */ - private static final GridTopic MSG_TOPIC = TOPIC_CACHE_COORDINATOR; - - /** */ - private static final byte MSG_POLICY = SYSTEM_POOL; - - /** */ - private volatile MvccCoordinator curCrd; - - /** */ - private final AtomicLong mvccCntr = new AtomicLong(1L); - - /** */ - private final GridAtomicLong committedCntr = new GridAtomicLong(1L); - - /** */ - private final ConcurrentSkipListMap<Long, GridCacheVersion> activeTxs = new ConcurrentSkipListMap<>(); - - /** */ - private final ConcurrentMap<Long, AtomicInteger> activeQueries = new ConcurrentHashMap<>(); - - /** */ - private final PreviousCoordinatorQueries prevCrdQueries = new PreviousCoordinatorQueries(); - - /** */ - private final ConcurrentMap<Long, MvccVersionFuture> verFuts = new ConcurrentHashMap<>(); - - /** */ - private final ConcurrentMap<Long, WaitAckFuture> ackFuts = new ConcurrentHashMap<>(); - - /** */ - private ConcurrentMap<Long, WaitTxFuture> waitTxFuts = new ConcurrentHashMap<>(); - - /** */ - private final AtomicLong futIdCntr = new AtomicLong(); - - /** */ - private final CountDownLatch crdLatch = new CountDownLatch(1); - - /** Topology version when local node was assigned as coordinator. */ - private long crdVer; - - /** */ - private StatCounter[] statCntrs; - - /** - * @param ver1 First version. - * @param ver2 Second version. - * @return - */ - public static int compareVersions(MvccCoordinatorVersion ver1, MvccCoordinatorVersion ver2) { - assert ver1 != null; - assert ver2 != null; - - int cmp = Long.compare(ver1.coordinatorVersion(), ver2.coordinatorVersion()); - - if (cmp != 0) - return cmp; - - return Long.compare(ver1.counter(), ver2.counter()); - } - - public CacheCoordinatorsSharedManager(GridKernalContext ctx) { - super(ctx); - } - - /** {@inheritDoc} */ - @Override public DiscoveryDataExchangeType discoveryDataType() { - return DiscoveryDataExchangeType.CACHE_CRD_PROC; - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - statCntrs = new StatCounter[7]; - - statCntrs[0] = new CounterWithAvg("CoordinatorTxCounterRequest", "avgTxs"); - statCntrs[1] = new CounterWithAvg("MvccCoordinatorVersionResponse", "avgFutTime"); - statCntrs[2] = new StatCounter("CoordinatorTxAckRequest"); - statCntrs[3] = new CounterWithAvg("CoordinatorTxAckResponse", "avgFutTime"); - statCntrs[4] = new StatCounter("TotalRequests"); - statCntrs[5] = new StatCounter("CoordinatorWaitTxsRequest"); - statCntrs[6] = new CounterWithAvg("CoordinatorWaitTxsResponse", "avgFutTime"); - - ctx.event().addLocalEventListener(new CacheCoordinatorNodeFailListener(), - EVT_NODE_FAILED, EVT_NODE_LEFT); - - ctx.io().addMessageListener(MSG_TOPIC, new CoordinatorMessageListener()); - } - - /** - * @param log Logger. - */ - public void dumpStatistics(IgniteLogger log) { - if (STAT_CNTRS) { - log.info("Mvcc coordinator statistics: "); - - for (StatCounter cntr : statCntrs) - cntr.dumpInfo(log); - } - } - - /** - * @param tx Transaction. - * @return Counter. - */ - public MvccCoordinatorVersion requestTxCounterOnCoordinator(IgniteInternalTx tx) { - assert ctx.localNodeId().equals(currentCoordinatorId()); - - return assignTxCounter(tx.nearXidVersion(), 0L); - } - - /** - * @param crd Coordinator. - * @param lsnr Response listener. - * @param txVer Transaction version. - * @return Counter request future. - */ - public IgniteInternalFuture<MvccCoordinatorVersion> requestTxCounter(MvccCoordinator crd, - MvccResponseListener lsnr, - GridCacheVersion txVer) { - assert !ctx.localNodeId().equals(crd.nodeId()); - - MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), - crd.nodeId(), - lsnr); - - verFuts.put(fut.id, fut); - - try { - ctx.io().sendToGridTopic(crd.nodeId(), - MSG_TOPIC, - new CoordinatorTxCounterRequest(fut.id, txVer), - MSG_POLICY); - } - catch (IgniteCheckedException e) { - fut.onError(e); - } - - return fut; - } - - /** - * @param crd Coordinator. - * @param mvccVer Query version. - */ - public void ackQueryDone(MvccCoordinator crd, MvccCoordinatorVersion mvccVer) { - assert crd != null; - - long trackCntr = mvccVer.counter(); - - MvccLongList txs = mvccVer.activeTransactions(); - - if (txs != null) { - for (int i = 0; i < txs.size(); i++) { - long txId = txs.get(i); - - if (txId < trackCntr) - trackCntr = txId; - } - } - - Message msg = crd.coordinatorVersion() == mvccVer.coordinatorVersion() ? new CoordinatorQueryAckRequest(trackCntr) : - new NewCoordinatorQueryAckRequest(mvccVer.coordinatorVersion(), trackCntr); - - try { - ctx.io().sendToGridTopic(crd.nodeId(), - MSG_TOPIC, - msg, - MSG_POLICY); - } - catch (ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send query ack, node left [crd=" + crd + ", msg=" + msg + ']'); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send query ack [crd=" + crd + ", msg=" + msg + ']', e); - } - } - - /** - * @param crd Coordinator. - * @return Counter request future. - */ - public IgniteInternalFuture<MvccCoordinatorVersion> requestQueryCounter(MvccCoordinator crd) { - assert crd != null; - - // TODO IGNITE-3478: special case for local? - MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), crd.nodeId(), null); - - verFuts.put(fut.id, fut); - - try { - ctx.io().sendToGridTopic(crd.nodeId(), - MSG_TOPIC, - new CoordinatorQueryVersionRequest(fut.id), - MSG_POLICY); - } - catch (IgniteCheckedException e) { - if (verFuts.remove(fut.id) != null) - fut.onDone(e); - } - - return fut; - } - - /** - * @param crdId Coordinator ID. - * @param txs Transaction IDs. - * @return Future. - */ - public IgniteInternalFuture<Void> waitTxsFuture(UUID crdId, GridLongList txs) { - assert crdId != null; - assert txs != null && txs.size() > 0; - - // TODO IGNITE-3478: special case for local? - - WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), crdId, false); - - ackFuts.put(fut.id, fut); - - try { - ctx.io().sendToGridTopic(crdId, - MSG_TOPIC, - new CoordinatorWaitTxsRequest(fut.id, txs), - MSG_POLICY); - } - catch (ClusterTopologyCheckedException e) { - if (ackFuts.remove(fut.id) != null) - fut.onDone(); // No need to ack, finish without error. - } - catch (IgniteCheckedException e) { - if (ackFuts.remove(fut.id) != null) - fut.onDone(e); - } - - return fut; - } - - /** - * @param crd Coordinator. - * @param mvccVer Transaction version. - * @return Acknowledge future. - */ - public IgniteInternalFuture<Void> ackTxCommit(UUID crd, MvccCoordinatorVersion mvccVer) { - assert crd != null; - assert mvccVer != null; - - WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), crd, true); - - ackFuts.put(fut.id, fut); - - try { - ctx.io().sendToGridTopic(crd, - MSG_TOPIC, - new CoordinatorTxAckRequest(fut.id, mvccVer.counter()), - MSG_POLICY); - } - catch (ClusterTopologyCheckedException e) { - if (ackFuts.remove(fut.id) != null) - fut.onDone(); // No need to ack, finish without error. - } - catch (IgniteCheckedException e) { - if (ackFuts.remove(fut.id) != null) - fut.onDone(e); - } - - return fut; - } - - /** - * @param crdId Coordinator node ID. - * @param mvccVer Transaction version. - */ - public void ackTxRollback(UUID crdId, MvccCoordinatorVersion mvccVer) { - CoordinatorTxAckRequest msg = new CoordinatorTxAckRequest(0, mvccVer.counter()); - - msg.skipResponse(true); - - try { - ctx.io().sendToGridTopic(crdId, - MSG_TOPIC, - msg, - MSG_POLICY); - } - catch (ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send tx rollback ack, node left [msg=" + msg + ", node=" + crdId + ']'); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send tx rollback ack [msg=" + msg + ", node=" + crdId + ']', e); - } - } - - /** - * @param nodeId Sender node ID. - * @param msg Message. - */ - private void processCoordinatorTxCounterRequest(UUID nodeId, CoordinatorTxCounterRequest msg) { - ClusterNode node = ctx.discovery().node(nodeId); - - if (node == null) { - if (log.isDebugEnabled()) - log.debug("Ignore tx counter request processing, node left [msg=" + msg + ", node=" + nodeId + ']'); - - return; - } - - MvccCoordinatorVersionResponse res = assignTxCounter(msg.txId(), msg.futureId()); - - if (STAT_CNTRS) - statCntrs[0].update(res.size()); - - try { - ctx.io().sendToGridTopic(node, - MSG_TOPIC, - res, - MSG_POLICY); - } - catch (ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send tx counter response, node left [msg=" + msg + ", node=" + nodeId + ']'); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send tx counter response [msg=" + msg + ", node=" + nodeId + ']', e); - } - } - - /** - * - * @param nodeId Sender node ID. - * @param msg Message. - */ - private void processCoordinatorQueryVersionRequest(UUID nodeId, CoordinatorQueryVersionRequest msg) { - ClusterNode node = ctx.discovery().node(nodeId); - - if (node == null) { - if (log.isDebugEnabled()) - log.debug("Ignore query counter request processing, node left [msg=" + msg + ", node=" + nodeId + ']'); - - return; - } - - MvccCoordinatorVersionResponse res = assignQueryCounter(nodeId, msg.futureId()); - - try { - ctx.io().sendToGridTopic(node, - MSG_TOPIC, - res, - MSG_POLICY); - } - catch (ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send query counter response, node left [msg=" + msg + ", node=" + nodeId + ']'); - - onNodeFailed(nodeId); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send query counter response [msg=" + msg + ", node=" + nodeId + ']', e); - - onQueryDone(res.counter()); - } - } - - /** - * @param nodeId Sender node ID. - * @param msg Message. - */ - private void processCoordinatorVersionResponse(UUID nodeId, MvccCoordinatorVersionResponse msg) { - MvccVersionFuture fut = verFuts.remove(msg.futureId()); - - if (fut != null) { - if (STAT_CNTRS) - statCntrs[1].update((System.nanoTime() - fut.startTime) * 1000); - - fut.onResponse(msg); - } - else { - if (ctx.discovery().alive(nodeId)) - U.warn(log, "Failed to find query version future [node=" + nodeId + ", msg=" + msg + ']'); - else if (log.isDebugEnabled()) - log.debug("Failed to find query version future [node=" + nodeId + ", msg=" + msg + ']'); - } - } - - /** - * @param msg Message. - */ - private void processCoordinatorQueryAckRequest(CoordinatorQueryAckRequest msg) { - onQueryDone(msg.counter()); - } - - /** - * @param nodeId Node ID. - * @param msg Message. - */ - private void processNewCoordinatorQueryAckRequest(UUID nodeId, NewCoordinatorQueryAckRequest msg) { - prevCrdQueries.onQueryDone(nodeId, msg); - } - - /** - * @param nodeId Sender node ID. - * @param msg Message. - */ - private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorTxAckRequest msg) { - onTxDone(msg.txCounter()); - - if (STAT_CNTRS) - statCntrs[2].update(); - - if (!msg.skipResponse()) { - try { - ctx.io().sendToGridTopic(nodeId, - MSG_TOPIC, - new CoordinatorFutureResponse(msg.futureId()), - MSG_POLICY); - } - catch (ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send tx ack response, node left [msg=" + msg + ", node=" + nodeId + ']'); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send tx ack response [msg=" + msg + ", node=" + nodeId + ']', e); - } - } - } - - /** - * @param nodeId Sender node ID. - * @param msg Message. - */ - private void processCoordinatorAckResponse(UUID nodeId, CoordinatorFutureResponse msg) { - WaitAckFuture fut = ackFuts.remove(msg.futureId()); - - if (fut != null) { - if (STAT_CNTRS) { - StatCounter cntr = fut.ackTx ? statCntrs[3] : statCntrs[6]; - - cntr.update((System.nanoTime() - fut.startTime) * 1000); - } - - fut.onResponse(); - } - else { - if (ctx.discovery().alive(nodeId)) - U.warn(log, "Failed to find tx ack future [node=" + nodeId + ", msg=" + msg + ']'); - else if (log.isDebugEnabled()) - log.debug("Failed to find tx ack future [node=" + nodeId + ", msg=" + msg + ']'); - } - } - - /** - * @param txId Transaction ID. - * @return Counter. - */ - private MvccCoordinatorVersionResponse assignTxCounter(GridCacheVersion txId, long futId) { - assert crdVer != 0; - - long nextCtr = mvccCntr.incrementAndGet(); - - // TODO IGNITE-3478 sorted? + change GridLongList.writeTo? - MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse(); - - for (Long txVer : activeTxs.keySet()) - res.addTx(txVer); - - Object old = activeTxs.put(nextCtr, txId); - - assert old == null : txId; - - long cleanupVer; - - if (prevCrdQueries.previousQueriesDone()) { - cleanupVer = committedCntr.get() - 1; - - for (Long qryVer : activeQueries.keySet()) { - if (qryVer <= cleanupVer) - cleanupVer = qryVer - 1; - } - } - else - cleanupVer = -1; - - res.init(futId, crdVer, nextCtr, cleanupVer); - - return res; - } - - /** - * @param txCntr Counter assigned to transaction. - */ - private void onTxDone(Long txCntr) { - GridFutureAdapter fut; // TODO IGNITE-3478. - - GridCacheVersion ver = activeTxs.remove(txCntr); - - assert ver != null; - - committedCntr.setIfGreater(txCntr); - - fut = waitTxFuts.remove(txCntr); - - if (fut != null) - fut.onDone(); - } - - static boolean increment(AtomicInteger cntr) { - for (;;) { - int current = cntr.get(); - - if (current == 0) - return false; - - if (cntr.compareAndSet(current, current + 1)) - return true; - } - } - - /** - * @param qryNodeId Node initiated query. - * @return Counter for query. - */ - private MvccCoordinatorVersionResponse assignQueryCounter(UUID qryNodeId, long futId) { - assert crdVer != 0; - - MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse(); - - Long mvccCntr; - - for(;;) { - mvccCntr = committedCntr.get(); - - Long trackCntr = mvccCntr; - - for (Long txVer : activeTxs.keySet()) { - if (txVer < trackCntr) - trackCntr = txVer; - - res.addTx(txVer); - } - - registerActiveQuery(trackCntr); - - if (committedCntr.get() == mvccCntr) - break; - else { - res.resetTransactionsCount(); - - onQueryDone(trackCntr); - } - } - - res.init(futId, crdVer, mvccCntr, COUNTER_NA); - - return res; - } - - private void registerActiveQuery(Long cntr) { - for (;;) { - AtomicInteger qryCnt = activeQueries.get(cntr); - - if (qryCnt != null) { - boolean inc = increment(qryCnt); - - if (!inc) { - activeQueries.remove(mvccCntr, qryCnt); - - continue; - } - } - else { - qryCnt = new AtomicInteger(1); - - if (activeQueries.putIfAbsent(cntr, qryCnt) != null) - continue; - } - - break; - } - } - - private void onNodeFailed(UUID nodeId) { - // TODO - } - - /** - * @param mvccCntr Query counter. - */ - private void onQueryDone(long mvccCntr) { - AtomicInteger cntr = activeQueries.get(mvccCntr); - - assert cntr != null : mvccCntr; - - int left = cntr.decrementAndGet(); - - assert left >= 0 : left; - - if (left == 0) { - boolean rmv = activeQueries.remove(mvccCntr, cntr); - - assert rmv; - } - } - - /** - * @param msg Message. - */ - private void processCoordinatorWaitTxsRequest(final UUID nodeId, final CoordinatorWaitTxsRequest msg) { - statCntrs[5].update(); - - GridLongList txs = msg.transactions(); - - GridCompoundFuture resFut = null; - - for (int i = 0; i < txs.size(); i++) { - Long txId = txs.get(i); - - WaitTxFuture fut = waitTxFuts.get(txId); - - if (fut == null) { - WaitTxFuture old = waitTxFuts.putIfAbsent(txId, fut = new WaitTxFuture(txId)); - - if (old != null) - fut = old; - } - - if (!activeTxs.containsKey(txId)) - fut.onDone(); - - if (!fut.isDone()) { - if (resFut == null) - resFut = new GridCompoundFuture(); - - resFut.add(fut); - } - } - - if (resFut != null) - resFut.markInitialized(); - - if (resFut == null || resFut.isDone()) - sendFutureResponse(nodeId, msg); - else { - resFut.listen(new IgniteInClosure<IgniteInternalFuture>() { - @Override public void apply(IgniteInternalFuture fut) { - sendFutureResponse(nodeId, msg); - } - }); - } - } - - /** - * @param nodeId - * @param msg - */ - private void sendFutureResponse(UUID nodeId, CoordinatorWaitTxsRequest msg) { - try { - ctx.io().sendToGridTopic(nodeId, - MSG_TOPIC, - new CoordinatorFutureResponse(msg.futureId()), - MSG_POLICY); - } - catch (ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send tx ack response, node left [msg=" + msg + ", node=" + nodeId + ']'); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send tx ack response [msg=" + msg + ", node=" + nodeId + ']', e); - } - } - - public MvccCoordinator currentCoordinator() { - return curCrd; - } - - public UUID currentCoordinatorId() { - MvccCoordinator curCrd = this.curCrd; - - return curCrd != null ? curCrd.nodeId() : null; - } - - /** - * @param topVer Cache affinity version (used for assert). - * @return Coordinator. - */ - public MvccCoordinator currentCoordinatorForCacheAffinity(AffinityTopologyVersion topVer) { - MvccCoordinator crd = curCrd; - - // Assert coordinator did not already change. - assert crd == null || crd.topologyVersion().compareTo(topVer) <= 0 : - "Invalid coordinator [crd=" + crd + ", topVer=" + topVer + ']'; - - return crd; - } - - /** - * @param discoCache Discovery snapshot. - * @return New coordinator. - */ - public MvccCoordinator reassignCoordinator(DiscoCache discoCache) { - assert curCrd == null || !F.nodeIds(discoCache.allNodes()).contains(curCrd.nodeId()) : curCrd; - - if (!discoCache.serverNodes().isEmpty()) { - ClusterNode node = discoCache.serverNodes().get(0); - - curCrd = new MvccCoordinator(node.id(), - discoCache.version().topologyVersion(), - discoCache.version()); - - log.info("Assigned mvcc coordinator: " + curCrd); - } - else { - curCrd = null; - - log.info("New mvcc coordinator was not assigned [topVer=" + discoCache.version() + ']'); - } - - return curCrd; - } - - /** - * @param nodeId Node ID - * @param activeQueries - */ - public void processClientActiveQueries(UUID nodeId, - @Nullable Map<MvccCounter, Integer> activeQueries) { - prevCrdQueries.processClientActiveQueries(nodeId, activeQueries); - } - - /** - * @param topVer Topology version. - * @param activeQueries Current queries. - */ - public void initCoordinator(AffinityTopologyVersion topVer, - DiscoCache discoCache, - Map<UUID, Map<MvccCounter, Integer>> activeQueries) - { - assert ctx.localNodeId().equals(curCrd.nodeId()); - - log.info("Initialize local node as mvcc coordinator [node=" + ctx.localNodeId() + - ", topVer=" + topVer + ']'); - - crdVer = topVer.topologyVersion(); - - prevCrdQueries.init(activeQueries, discoCache, ctx.discovery()); - - crdLatch.countDown(); - } - - /** - * - */ - public class MvccVersionFuture extends GridFutureAdapter<MvccCoordinatorVersion> { - /** */ - private final Long id; - - /** */ - private MvccResponseListener lsnr; - - /** */ - public final UUID crdId; - - /** */ - long startTime; - - /** - * @param id Future ID. - * @param crdId Coordinator node ID. - */ - MvccVersionFuture(Long id, UUID crdId, @Nullable MvccResponseListener lsnr) { - this.id = id; - this.crdId = crdId; - this.lsnr = lsnr; - - if (STAT_CNTRS) - startTime = System.nanoTime(); - } - - /** - * @param res Response. - */ - void onResponse(MvccCoordinatorVersionResponse res) { - assert res.counter() != COUNTER_NA; - - if (lsnr != null) - lsnr.onMvccResponse(crdId, res); - - onDone(res); - } - - void onError(IgniteCheckedException err) { - if (verFuts.remove(id) != null) { - if (lsnr != null) - lsnr.onMvccError(err); - - onDone(err); - } - } - - /** - * @param nodeId Failed node ID. - */ - void onNodeLeft(UUID nodeId) { - if (crdId.equals(nodeId)) { - ClusterTopologyCheckedException err = new ClusterTopologyCheckedException("Failed to request coordinator version, " + - "coordinator failed: " + nodeId); - - onError(err); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "MvccVersionFuture [crd=" + crdId + ", id=" + id + ']'; - } - } - - /** - * - */ - private class WaitAckFuture extends GridFutureAdapter<Void> { - /** */ - private final long id; - - /** */ - private final UUID crdId; - - /** */ - long startTime; - - /** */ - final boolean ackTx; - - /** - * @param id Future ID. - * @param crdId Coordinator node ID. - */ - WaitAckFuture(long id, UUID crdId, boolean ackTx) { - this.id = id; - this.crdId = crdId; - this.ackTx = ackTx; - - if (STAT_CNTRS) - startTime = System.nanoTime(); - } - - /** - * - */ - void onResponse() { - onDone(); - } - - /** - * @param nodeId Failed node ID. - */ - void onNodeLeft(UUID nodeId) { - if (crdId.equals(nodeId) && verFuts.remove(id) != null) - onDone(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "WaitAckFuture [crdId=" + crdId + ", id=" + id + ']'; - } - } - - /** - * - */ - private class CacheCoordinatorNodeFailListener implements GridLocalEventListener { - /** {@inheritDoc} */ - @Override public void onEvent(Event evt) { - assert evt instanceof DiscoveryEvent : evt; - - DiscoveryEvent discoEvt = (DiscoveryEvent)evt; - - UUID nodeId = discoEvt.eventNode().id(); - - for (MvccVersionFuture fut : verFuts.values()) - fut.onNodeLeft(nodeId); - - for (WaitAckFuture fut : ackFuts.values()) - fut.onNodeLeft(nodeId); - - prevCrdQueries.onNodeLeft(nodeId); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "CacheCoordinatorDiscoveryListener[]"; - } - } - /** - * - */ - private class CoordinatorMessageListener implements GridMessageListener { - /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg, byte plc) { - if (STAT_CNTRS) - statCntrs[4].update(); - - MvccCoordinatorMessage msg0 = (MvccCoordinatorMessage)msg; - - if (msg0.waitForCoordinatorInit()) { - if (crdVer == 0) { - try { - U.await(crdLatch); - } - catch (IgniteInterruptedCheckedException e) { - U.warn(log, "Failed to wait for coordinator initialization, thread interrupted [" + - "msgNode=" + nodeId + ", msg=" + msg + ']'); - - return; - } - - assert crdVer != 0L; - } - } - - if (msg instanceof CoordinatorTxCounterRequest) - processCoordinatorTxCounterRequest(nodeId, (CoordinatorTxCounterRequest)msg); - else if (msg instanceof CoordinatorTxAckRequest) - processCoordinatorTxAckRequest(nodeId, (CoordinatorTxAckRequest)msg); - else if (msg instanceof CoordinatorFutureResponse) - processCoordinatorAckResponse(nodeId, (CoordinatorFutureResponse)msg); - else if (msg instanceof CoordinatorQueryAckRequest) - processCoordinatorQueryAckRequest((CoordinatorQueryAckRequest)msg); - else if (msg instanceof CoordinatorQueryVersionRequest) - processCoordinatorQueryVersionRequest(nodeId, (CoordinatorQueryVersionRequest)msg); - else if (msg instanceof MvccCoordinatorVersionResponse) - processCoordinatorVersionResponse(nodeId, (MvccCoordinatorVersionResponse) msg); - else if (msg instanceof CoordinatorWaitTxsRequest) - processCoordinatorWaitTxsRequest(nodeId, (CoordinatorWaitTxsRequest)msg); - else if (msg instanceof NewCoordinatorQueryAckRequest) - processNewCoordinatorQueryAckRequest(nodeId, (NewCoordinatorQueryAckRequest)msg); - else - U.warn(log, "Unexpected message received [node=" + nodeId + ", msg=" + msg + ']'); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "CoordinatorMessageListener[]"; - } - } - /** - * - */ - static class StatCounter { - /** */ - final String name; - - /** */ - final LongAdder8 cntr = new LongAdder8(); - - public StatCounter(String name) { - this.name = name; - } - - void update() { - cntr.increment(); - } - - void update(GridLongList arg) { - throw new UnsupportedOperationException(); - } - - void update(long arg) { - throw new UnsupportedOperationException(); - } - - void dumpInfo(IgniteLogger log) { - long totalCnt = cntr.sumThenReset(); - - if (totalCnt > 0) - log.info(name + " [cnt=" + totalCnt + ']'); - } - } - - /** - * - */ - static class CounterWithAvg extends StatCounter { - /** */ - final LongAdder8 total = new LongAdder8(); - - /** */ - final String avgName; - - CounterWithAvg(String name, String avgName) { - super(name); - - this.avgName = avgName; - } - - @Override void update(GridLongList arg) { - update(arg != null ? arg.size() : 0); - } - - @Override void update(long add) { - cntr.increment(); - - total.add(add); - } - - void dumpInfo(IgniteLogger log) { - long totalCnt = cntr.sumThenReset(); - long totalSum = total.sumThenReset(); - - if (totalCnt > 0) - log.info(name + " [cnt=" + totalCnt + ", " + avgName + "=" + ((float)totalSum / totalCnt) + ']'); - } - } - - /** - * - */ - private static class WaitTxFuture extends GridFutureAdapter { - /** */ - private final long txId; - - /** - * @param txId Transaction ID. - */ - WaitTxFuture(long txId) { - this.txId = txId; - } - } -}
