http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java new file mode 100644 index 0000000..ac55164 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java @@ -0,0 +1,1304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.Event; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.GridTopic; +import org.apache.ignite.internal.IgniteDiagnosticPrepareContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.communication.GridMessageListener; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.GridAtomicLong; +import org.apache.ignite.internal.util.GridLongList; +import org.apache.ignite.internal.util.future.GridCompoundFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.discovery.DiscoveryDataBag; +import org.jetbrains.annotations.Nullable; +import org.jsr166.LongAdder8; + +import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED; +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED; +import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED; +import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; + +/** + * + */ +public class CacheCoordinatorsProcessor extends GridProcessorAdapter { + /** */ + public static final long COUNTER_NA = 0L; + + /** */ + private static final boolean STAT_CNTRS = false; + + /** */ + private static final GridTopic MSG_TOPIC = TOPIC_CACHE_COORDINATOR; + + /** */ + private static final byte MSG_POLICY = SYSTEM_POOL; + + /** */ + private volatile MvccCoordinator curCrd; + + /** */ + private final AtomicLong mvccCntr = new AtomicLong(1L); + + /** */ + private final GridAtomicLong committedCntr = new GridAtomicLong(1L); + + /** */ + private final ConcurrentSkipListMap<Long, GridCacheVersion> activeTxs = new ConcurrentSkipListMap<>(); + + /** */ + private final ActiveQueries activeQueries = new ActiveQueries(); + + /** */ + private final PreviousCoordinatorQueries prevCrdQueries = new PreviousCoordinatorQueries(); + + /** */ + private final ConcurrentMap<Long, MvccVersionFuture> verFuts = new ConcurrentHashMap<>(); + + /** */ + private final ConcurrentMap<Long, WaitAckFuture> ackFuts = new ConcurrentHashMap<>(); + + /** */ + private ConcurrentMap<Long, WaitTxFuture> waitTxFuts = new ConcurrentHashMap<>(); + + /** */ + private final AtomicLong futIdCntr = new AtomicLong(); + + /** */ + private final CountDownLatch crdLatch = new CountDownLatch(1); + + /** Topology version when local node was assigned as coordinator. */ + private long crdVer; + + /** */ + private StatCounter[] statCntrs; + + /** */ + private CacheCoordinatorsDiscoveryData discoData = new CacheCoordinatorsDiscoveryData(null); + + /** For tests only. */ + private static IgniteClosure<Collection<ClusterNode>, ClusterNode> crdC; + + /** + * @param ctx Context. + */ + public CacheCoordinatorsProcessor(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + statCntrs = new StatCounter[7]; + + statCntrs[0] = new CounterWithAvg("CoordinatorTxCounterRequest", "avgTxs"); + statCntrs[1] = new CounterWithAvg("MvccCoordinatorVersionResponse", "avgFutTime"); + statCntrs[2] = new StatCounter("CoordinatorTxAckRequest"); + statCntrs[3] = new CounterWithAvg("CoordinatorTxAckResponse", "avgFutTime"); + statCntrs[4] = new StatCounter("TotalRequests"); + statCntrs[5] = new StatCounter("CoordinatorWaitTxsRequest"); + statCntrs[6] = new CounterWithAvg("CoordinatorWaitTxsResponse", "avgFutTime"); + + ctx.event().addLocalEventListener(new CacheCoordinatorNodeFailListener(), + EVT_NODE_FAILED, EVT_NODE_LEFT); + + ctx.io().addMessageListener(MSG_TOPIC, new CoordinatorMessageListener()); + } + + /** {@inheritDoc} */ + @Override public DiscoveryDataExchangeType discoveryDataType() { + return DiscoveryDataExchangeType.CACHE_CRD_PROC; + } + + /** {@inheritDoc} */ + @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { + Integer cmpId = discoveryDataType().ordinal(); + + if (!dataBag.commonDataCollectedFor(cmpId)) + dataBag.addGridCommonData(cmpId, discoData); + } + + /** {@inheritDoc} */ + @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { + discoData = (CacheCoordinatorsDiscoveryData)data.commonData(); + + assert discoData != null; + } + + /** + * @return Discovery data. + */ + public CacheCoordinatorsDiscoveryData discoveryData() { + return discoData; + } + + /** + * For testing only. + * + * @param crdC Closure assigning coordinator. + */ + static void coordinatorAssignClosure(IgniteClosure<Collection<ClusterNode>, ClusterNode> crdC) { + CacheCoordinatorsProcessor.crdC = crdC; + } + + /** + * @param evtType Event type. + * @param nodes Current nodes. + * @param topVer Topology version. + */ + public void onDiscoveryEvent(int evtType, Collection<ClusterNode> nodes, long topVer) { + if (evtType == EVT_NODE_METRICS_UPDATED) + return; + + MvccCoordinator crd; + + if (evtType == EVT_NODE_SEGMENTED || evtType == EVT_CLIENT_NODE_DISCONNECTED) + crd = null; + else { + crd = discoData.coordinator(); + + if (crd == null || + ((evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT) && !F.nodeIds(nodes).contains(crd.nodeId()))) { + ClusterNode crdNode = null; + + if (crdC != null) { + crdNode = crdC.apply(nodes); + + log.info("Assigned coordinator using test closure: " + crd); + } + else { + // Expect nodes are sorted by order. + for (ClusterNode node : nodes) { + if (!CU.clientNode(node)) { + crdNode = node; + + break; + } + } + } + + crd = crdNode != null ? new + MvccCoordinator(crdNode.id(), topVer, new AffinityTopologyVersion(topVer, 0)) : null; + + if (crd != null) + log.info("Assigned mvcc coordinator [crd=" + crd + ", crdNode=" + crdNode +']'); + else + U.warn(log, "New mvcc coordinator was not assigned [topVer=" + topVer + ']'); + } + } + + discoData = new CacheCoordinatorsDiscoveryData(crd); + } + + /** + * @param log Logger. + */ + public void dumpStatistics(IgniteLogger log) { + if (STAT_CNTRS) { + log.info("Mvcc coordinator statistics: "); + + for (StatCounter cntr : statCntrs) + cntr.dumpInfo(log); + } + } + + /** + * @param tx Transaction. + * @return Counter. + */ + public MvccCoordinatorVersion requestTxCounterOnCoordinator(IgniteInternalTx tx) { + assert ctx.localNodeId().equals(currentCoordinatorId()); + + return assignTxCounter(tx.nearXidVersion(), 0L); + } + + /** + * @param crd Coordinator. + * @param lsnr Response listener. + * @param txVer Transaction version. + * @return Counter request future. + */ + public IgniteInternalFuture<MvccCoordinatorVersion> requestTxCounter(MvccCoordinator crd, + MvccResponseListener lsnr, + GridCacheVersion txVer) { + assert !ctx.localNodeId().equals(crd.nodeId()); + + MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), + crd.nodeId(), + lsnr); + + verFuts.put(fut.id, fut); + + try { + ctx.io().sendToGridTopic(crd.nodeId(), + MSG_TOPIC, + new CoordinatorTxCounterRequest(fut.id, txVer), + MSG_POLICY); + } + catch (IgniteCheckedException e) { + if (verFuts.remove(fut.id) != null) + fut.onError(e); + } + + return fut; + } + + /** + * @param crd Coordinator. + * @param mvccVer Query version. + */ + public void ackQueryDone(MvccCoordinator crd, MvccCoordinatorVersion mvccVer) { + assert crd != null; + + long trackCntr = mvccVer.counter(); + + MvccLongList txs = mvccVer.activeTransactions(); + + if (txs != null) { + for (int i = 0; i < txs.size(); i++) { + long txId = txs.get(i); + + if (txId < trackCntr) + trackCntr = txId; + } + } + + Message msg = crd.coordinatorVersion() == mvccVer.coordinatorVersion() ? new CoordinatorQueryAckRequest(trackCntr) : + new NewCoordinatorQueryAckRequest(mvccVer.coordinatorVersion(), trackCntr); + + try { + ctx.io().sendToGridTopic(crd.nodeId(), + MSG_TOPIC, + msg, + MSG_POLICY); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send query ack, node left [crd=" + crd + ", msg=" + msg + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send query ack [crd=" + crd + ", msg=" + msg + ']', e); + } + } + + /** + * @param crd Coordinator. + * @return Counter request future. + */ + public IgniteInternalFuture<MvccCoordinatorVersion> requestQueryCounter(MvccCoordinator crd) { + assert crd != null; + + // TODO IGNITE-3478: special case for local? + MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), crd.nodeId(), null); + + verFuts.put(fut.id, fut); + + try { + ctx.io().sendToGridTopic(crd.nodeId(), + MSG_TOPIC, + new CoordinatorQueryVersionRequest(fut.id), + MSG_POLICY); + } + catch (IgniteCheckedException e) { + if (verFuts.remove(fut.id) != null) + fut.onError(e); + } + + return fut; + } + + /** + * @param crdId Coordinator ID. + * @param txs Transaction IDs. + * @return Future. + */ + public IgniteInternalFuture<Void> waitTxsFuture(UUID crdId, GridLongList txs) { + assert crdId != null; + assert txs != null && txs.size() > 0; + + // TODO IGNITE-3478: special case for local? + WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), crdId, false); + + ackFuts.put(fut.id, fut); + + try { + ctx.io().sendToGridTopic(crdId, + MSG_TOPIC, + new CoordinatorWaitTxsRequest(fut.id, txs), + MSG_POLICY); + } + catch (IgniteCheckedException e) { + if (ackFuts.remove(fut.id) != null) { + if (e instanceof ClusterTopologyCheckedException) + fut.onDone(); // No need to wait, new coordinator will be assigned, finish without error. + else + fut.onDone(e); + } + } + + return fut; + } + + /** + * @param crd Coordinator. + * @param mvccVer Transaction version. + * @return Acknowledge future. + */ + public IgniteInternalFuture<Void> ackTxCommit(UUID crd, MvccCoordinatorVersion mvccVer) { + assert crd != null; + assert mvccVer != null; + + WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), crd, true); + + ackFuts.put(fut.id, fut); + + try { + ctx.io().sendToGridTopic(crd, + MSG_TOPIC, + new CoordinatorTxAckRequest(fut.id, mvccVer.counter()), + MSG_POLICY); + } + catch (IgniteCheckedException e) { + if (ackFuts.remove(fut.id) != null) { + if (e instanceof ClusterTopologyCheckedException) + fut.onDone(); // No need to ack, finish without error. + else + fut.onDone(e); + } + } + + return fut; + } + + /** + * @param crdId Coordinator node ID. + * @param mvccVer Transaction version. + */ + public void ackTxRollback(UUID crdId, MvccCoordinatorVersion mvccVer) { + CoordinatorTxAckRequest msg = new CoordinatorTxAckRequest(0, mvccVer.counter()); + + msg.skipResponse(true); + + try { + ctx.io().sendToGridTopic(crdId, + MSG_TOPIC, + msg, + MSG_POLICY); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send tx rollback ack, node left [msg=" + msg + ", node=" + crdId + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send tx rollback ack [msg=" + msg + ", node=" + crdId + ']', e); + } + } + + /** + * @param nodeId Sender node ID. + * @param msg Message. + */ + private void processCoordinatorTxCounterRequest(UUID nodeId, CoordinatorTxCounterRequest msg) { + ClusterNode node = ctx.discovery().node(nodeId); + + if (node == null) { + if (log.isDebugEnabled()) + log.debug("Ignore tx counter request processing, node left [msg=" + msg + ", node=" + nodeId + ']'); + + return; + } + + MvccCoordinatorVersionResponse res = assignTxCounter(msg.txId(), msg.futureId()); + + if (STAT_CNTRS) + statCntrs[0].update(res.size()); + + try { + ctx.io().sendToGridTopic(node, + MSG_TOPIC, + res, + MSG_POLICY); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send tx counter response, node left [msg=" + msg + ", node=" + nodeId + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send tx counter response [msg=" + msg + ", node=" + nodeId + ']', e); + } + } + + /** + * + * @param nodeId Sender node ID. + * @param msg Message. + */ + private void processCoordinatorQueryVersionRequest(UUID nodeId, CoordinatorQueryVersionRequest msg) { + ClusterNode node = ctx.discovery().node(nodeId); + + if (node == null) { + if (log.isDebugEnabled()) + log.debug("Ignore query counter request processing, node left [msg=" + msg + ", node=" + nodeId + ']'); + + return; + } + + MvccCoordinatorVersionResponse res = assignQueryCounter(nodeId, msg.futureId()); + + try { + ctx.io().sendToGridTopic(node, + MSG_TOPIC, + res, + MSG_POLICY); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send query counter response, node left [msg=" + msg + ", node=" + nodeId + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send query counter response [msg=" + msg + ", node=" + nodeId + ']', e); + + onQueryDone(nodeId, res.counter()); + } + } + + /** + * @param nodeId Sender node ID. + * @param msg Message. + */ + private void processCoordinatorVersionResponse(UUID nodeId, MvccCoordinatorVersionResponse msg) { + MvccVersionFuture fut = verFuts.remove(msg.futureId()); + + if (fut != null) { + if (STAT_CNTRS) + statCntrs[1].update((System.nanoTime() - fut.startTime) * 1000); + + fut.onResponse(msg); + } + else { + if (ctx.discovery().alive(nodeId)) + U.warn(log, "Failed to find query version future [node=" + nodeId + ", msg=" + msg + ']'); + else if (log.isDebugEnabled()) + log.debug("Failed to find query version future [node=" + nodeId + ", msg=" + msg + ']'); + } + } + + /** + * @param nodeId Node ID. + * @param msg Message. + */ + private void processCoordinatorQueryAckRequest(UUID nodeId, CoordinatorQueryAckRequest msg) { + onQueryDone(nodeId, msg.counter()); + } + + /** + * @param nodeId Node ID. + * @param msg Message. + */ + private void processNewCoordinatorQueryAckRequest(UUID nodeId, NewCoordinatorQueryAckRequest msg) { + prevCrdQueries.onQueryDone(nodeId, msg); + } + + /** + * @param nodeId Sender node ID. + * @param msg Message. + */ + private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorTxAckRequest msg) { + onTxDone(msg.txCounter()); + + if (STAT_CNTRS) + statCntrs[2].update(); + + if (!msg.skipResponse()) { + try { + ctx.io().sendToGridTopic(nodeId, + MSG_TOPIC, + new CoordinatorFutureResponse(msg.futureId()), + MSG_POLICY); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send tx ack response, node left [msg=" + msg + ", node=" + nodeId + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send tx ack response [msg=" + msg + ", node=" + nodeId + ']', e); + } + } + } + + /** + * @param nodeId Sender node ID. + * @param msg Message. + */ + private void processCoordinatorAckResponse(UUID nodeId, CoordinatorFutureResponse msg) { + WaitAckFuture fut = ackFuts.remove(msg.futureId()); + + if (fut != null) { + if (STAT_CNTRS) { + StatCounter cntr = fut.ackTx ? statCntrs[3] : statCntrs[6]; + + cntr.update((System.nanoTime() - fut.startTime) * 1000); + } + + fut.onResponse(); + } + else { + if (ctx.discovery().alive(nodeId)) + U.warn(log, "Failed to find tx ack future [node=" + nodeId + ", msg=" + msg + ']'); + else if (log.isDebugEnabled()) + log.debug("Failed to find tx ack future [node=" + nodeId + ", msg=" + msg + ']'); + } + } + + /** + * @param txId Transaction ID. + * @return Counter. + */ + private MvccCoordinatorVersionResponse assignTxCounter(GridCacheVersion txId, long futId) { + assert crdVer != 0; + + long nextCtr = mvccCntr.incrementAndGet(); + + // TODO IGNITE-3478 sorted? + change GridLongList.writeTo? + MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse(); + + for (Long txVer : activeTxs.keySet()) + res.addTx(txVer); + + Object old = activeTxs.put(nextCtr, txId); + + assert old == null : txId; + + long cleanupVer; + + if (prevCrdQueries.previousQueriesDone()) { + cleanupVer = committedCntr.get() - 1; + + Long qryVer = activeQueries.minimalQueryCounter(); + + if (qryVer != null && qryVer <= cleanupVer) + cleanupVer = qryVer - 1; + } + else + cleanupVer = -1; + + res.init(futId, crdVer, nextCtr, cleanupVer); + + return res; + } + + /** + * @param txCntr Counter assigned to transaction. + */ + private void onTxDone(Long txCntr) { + GridFutureAdapter fut; // TODO IGNITE-3478. + + GridCacheVersion ver = activeTxs.remove(txCntr); + + assert ver != null; + + committedCntr.setIfGreater(txCntr); + + fut = waitTxFuts.remove(txCntr); + + if (fut != null) + fut.onDone(); + } + + /** + * + */ + class ActiveQueries { + /** */ + private final Map<UUID, TreeMap<Long, AtomicInteger>> activeQueries = new HashMap<>(); + + /** */ + private Long minQry; + + Long minimalQueryCounter() { + synchronized (this) { + return minQry; + } + } + + synchronized MvccCoordinatorVersionResponse assignQueryCounter(UUID nodeId, long futId) { + MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse(); + + Long mvccCntr; + Long trackCntr; + + for(;;) { + mvccCntr = committedCntr.get(); + + trackCntr = mvccCntr; + + for (Long txVer : activeTxs.keySet()) { + if (txVer < trackCntr) + trackCntr = txVer; + + res.addTx(txVer); + } + + Long minQry0 = minQry; + + if (minQry == null || trackCntr < minQry) + minQry = trackCntr; + + if (committedCntr.get() == mvccCntr) + break; + + minQry = minQry0; + + res.resetTransactionsCount(); + } + + TreeMap<Long, AtomicInteger> nodeMap = activeQueries.get(nodeId); + + if (nodeMap == null) + activeQueries.put(nodeId, nodeMap = new TreeMap<>()); + + AtomicInteger qryCnt = nodeMap.get(trackCntr); + + if (qryCnt == null) + nodeMap.put(trackCntr, new AtomicInteger(1)); + else + qryCnt.incrementAndGet(); + + res.init(futId, crdVer, mvccCntr, COUNTER_NA); + + return res; + } + + synchronized void onQueryDone(UUID nodeId, Long mvccCntr) { + TreeMap<Long, AtomicInteger> nodeMap = activeQueries.get(nodeId); + + if (nodeMap == null) + return; + + assert minQry != null; + + AtomicInteger qryCnt = nodeMap.get(mvccCntr); + + assert qryCnt != null : "[node=" + nodeId + ", nodeMap=" + nodeMap + ", cntr=" + mvccCntr + "]"; + + int left = qryCnt.decrementAndGet(); + + if (left == 0) { + nodeMap.remove(mvccCntr); + + if (mvccCntr == minQry.longValue()) + minQry = activeMinimal(); + } + } + + synchronized void onNodeFailed(UUID nodeId) { + activeQueries.remove(nodeId); + + minQry = activeMinimal(); + } + + private Long activeMinimal() { + Long min = null; + + for (TreeMap<Long, AtomicInteger> m : activeQueries.values()) { + Map.Entry<Long, AtomicInteger> e = m.firstEntry(); + + if (e != null && (min == null || e.getKey() < min)) + min = e.getKey(); + } + + return min; + } + } + + /** + * @param qryNodeId Node initiated query. + * @return Counter for query. + */ + private MvccCoordinatorVersionResponse assignQueryCounter(UUID qryNodeId, long futId) { + assert crdVer != 0; + + return activeQueries.assignQueryCounter(qryNodeId, futId); + +// MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse(); +// +// Long mvccCntr; +// +// for(;;) { +// mvccCntr = committedCntr.get(); +// +// Long trackCntr = mvccCntr; +// +// for (Long txVer : activeTxs.keySet()) { +// if (txVer < trackCntr) +// trackCntr = txVer; +// +// res.addTx(txVer); +// } +// +// registerActiveQuery(trackCntr); +// +// if (committedCntr.get() == mvccCntr) +// break; +// else { +// res.resetTransactionsCount(); +// +// onQueryDone(trackCntr); +// } +// } +// +// res.init(futId, crdVer, mvccCntr, COUNTER_NA); +// +// return res; + } +// +// private void registerActiveQuery(Long mvccCntr) { +// for (;;) { +// AtomicInteger qryCnt = activeQueries.get(mvccCntr); +// +// if (qryCnt != null) { +// boolean inc = increment(qryCnt); +// +// if (!inc) { +// activeQueries.remove(mvccCntr, qryCnt); +// +// continue; +// } +// } +// else { +// qryCnt = new AtomicInteger(1); +// +// if (activeQueries.putIfAbsent(mvccCntr, qryCnt) != null) +// continue; +// } +// +// break; +// } +// } +// +// static boolean increment(AtomicInteger cntr) { +// for (;;) { +// int current = cntr.get(); +// +// if (current == 0) +// return false; +// +// if (cntr.compareAndSet(current, current + 1)) +// return true; +// } +// } + + /** + * @param mvccCntr Query counter. + */ + private void onQueryDone(UUID nodeId, Long mvccCntr) { + activeQueries.onQueryDone(nodeId, mvccCntr); +// AtomicInteger qryCnt = activeQueries.get(mvccCntr); +// +// assert qryCnt != null : mvccCntr; +// +// int left = qryCnt.decrementAndGet(); +// +// assert left >= 0 : left; +// +// if (left == 0) +// activeQueries.remove(mvccCntr, qryCnt); + } + + /** + * @param msg Message. + */ + private void processCoordinatorWaitTxsRequest(final UUID nodeId, final CoordinatorWaitTxsRequest msg) { + statCntrs[5].update(); + + GridLongList txs = msg.transactions(); + + GridCompoundFuture resFut = null; + + for (int i = 0; i < txs.size(); i++) { + Long txId = txs.get(i); + + WaitTxFuture fut = waitTxFuts.get(txId); + + if (fut == null) { + WaitTxFuture old = waitTxFuts.putIfAbsent(txId, fut = new WaitTxFuture(txId)); + + if (old != null) + fut = old; + } + + if (!activeTxs.containsKey(txId)) + fut.onDone(); + + if (!fut.isDone()) { + if (resFut == null) + resFut = new GridCompoundFuture(); + + resFut.add(fut); + } + } + + if (resFut != null) + resFut.markInitialized(); + + if (resFut == null || resFut.isDone()) + sendFutureResponse(nodeId, msg); + else { + resFut.listen(new IgniteInClosure<IgniteInternalFuture>() { + @Override public void apply(IgniteInternalFuture fut) { + sendFutureResponse(nodeId, msg); + } + }); + } + } + + /** + * @param nodeId + * @param msg + */ + private void sendFutureResponse(UUID nodeId, CoordinatorWaitTxsRequest msg) { + try { + ctx.io().sendToGridTopic(nodeId, + MSG_TOPIC, + new CoordinatorFutureResponse(msg.futureId()), + MSG_POLICY); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send tx ack response, node left [msg=" + msg + ", node=" + nodeId + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send tx ack response [msg=" + msg + ", node=" + nodeId + ']', e); + } + } + + /** + * @return + */ + public MvccCoordinator currentCoordinator() { + return curCrd; + } + + public void currentCoordinator(MvccCoordinator curCrd) { + this.curCrd = curCrd; + } + + /** + * @return + */ + public UUID currentCoordinatorId() { + MvccCoordinator curCrd = this.curCrd; + + return curCrd != null ? curCrd.nodeId() : null; + } + + /** + * @param topVer Cache affinity version (used for assert). + * @return Coordinator. + */ + public MvccCoordinator currentCoordinatorForCacheAffinity(AffinityTopologyVersion topVer) { + MvccCoordinator crd = curCrd; + + // Assert coordinator did not already change. + assert crd == null || crd.topologyVersion().compareTo(topVer) <= 0 : + "Invalid coordinator [crd=" + crd + ", topVer=" + topVer + ']'; + + return crd; + } + + /** + * @param nodeId Node ID + * @param activeQueries Active queries. + */ + public void processClientActiveQueries(UUID nodeId, + @Nullable Map<MvccCounter, Integer> activeQueries) { + prevCrdQueries.processClientActiveQueries(nodeId, activeQueries); + } + + /** + * @param topVer Topology version. + * @param discoCache Discovery data. + * @param activeQueries Current queries. + */ + public void initCoordinator(AffinityTopologyVersion topVer, + DiscoCache discoCache, + Map<UUID, Map<MvccCounter, Integer>> activeQueries) + { + assert ctx.localNodeId().equals(curCrd.nodeId()); + + log.info("Initialize local node as mvcc coordinator [node=" + ctx.localNodeId() + + ", topVer=" + topVer + ']'); + + crdVer = topVer.topologyVersion(); + + prevCrdQueries.init(activeQueries, discoCache, ctx.discovery()); + + crdLatch.countDown(); + } + + /** + * @param log Logger. + * @param diagCtx Diagnostic request. + */ + public void dumpDebugInfo(IgniteLogger log, @Nullable IgniteDiagnosticPrepareContext diagCtx) { + boolean first = true; + + for (MvccVersionFuture verFur : verFuts.values()) { + if (first) { + U.warn(log, "Pending mvcc version futures: "); + + first = false; + } + + U.warn(log, ">>> " + verFur.toString()); + } + + first = true; + + for (WaitAckFuture waitAckFut : ackFuts.values()) { + if (first) { + U.warn(log, "Pending mvcc wait ack futures: "); + + first = false; + } + + U.warn(log, ">>> " + waitAckFut.toString()); + } + } + + /** + * + */ + public class MvccVersionFuture extends GridFutureAdapter<MvccCoordinatorVersion> { + /** */ + private final Long id; + + /** */ + private MvccResponseListener lsnr; + + /** */ + public final UUID crdId; + + /** */ + long startTime; + + /** + * @param id Future ID. + * @param crdId Coordinator node ID. + */ + MvccVersionFuture(Long id, UUID crdId, @Nullable MvccResponseListener lsnr) { + this.id = id; + this.crdId = crdId; + this.lsnr = lsnr; + + if (STAT_CNTRS) + startTime = System.nanoTime(); + } + + /** + * @param res Response. + */ + void onResponse(MvccCoordinatorVersionResponse res) { + assert res.counter() != COUNTER_NA; + + if (lsnr != null) + lsnr.onMvccResponse(crdId, res); + + onDone(res); + } + + /** + * @param err Error. + */ + void onError(IgniteCheckedException err) { + if (lsnr != null) + lsnr.onMvccError(err); + + onDone(err); + } + + /** + * @param nodeId Failed node ID. + */ + void onNodeLeft(UUID nodeId ) { + if (crdId.equals(nodeId) && verFuts.remove(id) != null) { + ClusterTopologyCheckedException err = new ClusterTopologyCheckedException("Failed to request mvcc " + + "version, coordinator failed: " + nodeId); + + onError(err); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "MvccVersionFuture [crd=" + crdId + ", id=" + id + ']'; + } + } + + /** + * + */ + private class WaitAckFuture extends GridFutureAdapter<Void> { + /** */ + private final long id; + + /** */ + private final UUID crdId; + + /** */ + long startTime; + + /** */ + final boolean ackTx; + + /** + * @param id Future ID. + * @param crdId Coordinator node ID. + */ + WaitAckFuture(long id, UUID crdId, boolean ackTx) { + assert crdId != null; + + this.id = id; + this.crdId = crdId; + this.ackTx = ackTx; + + if (STAT_CNTRS) + startTime = System.nanoTime(); + } + + /** + * + */ + void onResponse() { + onDone(); + } + + /** + * @param nodeId Failed node ID. + */ + void onNodeLeft(UUID nodeId) { + if (crdId.equals(nodeId) && ackFuts.remove(id) != null) + onDone(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "WaitAckFuture [crdId=" + crdId + + ", id=" + id + + ", ackTx=" + ackTx + ']'; + } + } + + /** + * + */ + private class CacheCoordinatorNodeFailListener implements GridLocalEventListener { + /** {@inheritDoc} */ + @Override public void onEvent(Event evt) { + assert evt instanceof DiscoveryEvent : evt; + + DiscoveryEvent discoEvt = (DiscoveryEvent)evt; + + UUID nodeId = discoEvt.eventNode().id(); + + for (MvccVersionFuture fut : verFuts.values()) + fut.onNodeLeft(nodeId); + + for (WaitAckFuture fut : ackFuts.values()) + fut.onNodeLeft(nodeId); + + activeQueries.onNodeFailed(nodeId); + + prevCrdQueries.onNodeFailed(nodeId); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "CacheCoordinatorDiscoveryListener[]"; + } + } + /** + * + */ + private class CoordinatorMessageListener implements GridMessageListener { + /** {@inheritDoc} */ + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { + if (STAT_CNTRS) + statCntrs[4].update(); + + MvccCoordinatorMessage msg0 = (MvccCoordinatorMessage)msg; + + if (msg0.waitForCoordinatorInit()) { + if (crdVer == 0) { + try { + U.await(crdLatch); + } + catch (IgniteInterruptedCheckedException e) { + U.warn(log, "Failed to wait for coordinator initialization, thread interrupted [" + + "msgNode=" + nodeId + ", msg=" + msg + ']'); + + return; + } + + assert crdVer != 0L; + } + } + + if (msg instanceof CoordinatorTxCounterRequest) + processCoordinatorTxCounterRequest(nodeId, (CoordinatorTxCounterRequest)msg); + else if (msg instanceof CoordinatorTxAckRequest) + processCoordinatorTxAckRequest(nodeId, (CoordinatorTxAckRequest)msg); + else if (msg instanceof CoordinatorFutureResponse) + processCoordinatorAckResponse(nodeId, (CoordinatorFutureResponse)msg); + else if (msg instanceof CoordinatorQueryAckRequest) + processCoordinatorQueryAckRequest(nodeId, (CoordinatorQueryAckRequest)msg); + else if (msg instanceof CoordinatorQueryVersionRequest) + processCoordinatorQueryVersionRequest(nodeId, (CoordinatorQueryVersionRequest)msg); + else if (msg instanceof MvccCoordinatorVersionResponse) + processCoordinatorVersionResponse(nodeId, (MvccCoordinatorVersionResponse) msg); + else if (msg instanceof CoordinatorWaitTxsRequest) + processCoordinatorWaitTxsRequest(nodeId, (CoordinatorWaitTxsRequest)msg); + else if (msg instanceof NewCoordinatorQueryAckRequest) + processNewCoordinatorQueryAckRequest(nodeId, (NewCoordinatorQueryAckRequest)msg); + else + U.warn(log, "Unexpected message received [node=" + nodeId + ", msg=" + msg + ']'); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "CoordinatorMessageListener[]"; + } + } + /** + * + */ + static class StatCounter { + /** */ + final String name; + + /** */ + final LongAdder8 cntr = new LongAdder8(); + + public StatCounter(String name) { + this.name = name; + } + + void update() { + cntr.increment(); + } + + void update(GridLongList arg) { + throw new UnsupportedOperationException(); + } + + void update(long arg) { + throw new UnsupportedOperationException(); + } + + void dumpInfo(IgniteLogger log) { + long totalCnt = cntr.sumThenReset(); + + if (totalCnt > 0) + log.info(name + " [cnt=" + totalCnt + ']'); + } + } + + /** + * + */ + static class CounterWithAvg extends StatCounter { + /** */ + final LongAdder8 total = new LongAdder8(); + + /** */ + final String avgName; + + CounterWithAvg(String name, String avgName) { + super(name); + + this.avgName = avgName; + } + + @Override void update(GridLongList arg) { + update(arg != null ? arg.size() : 0); + } + + @Override void update(long add) { + cntr.increment(); + + total.add(add); + } + + void dumpInfo(IgniteLogger log) { + long totalCnt = cntr.sumThenReset(); + long totalSum = total.sumThenReset(); + + if (totalCnt > 0) + log.info(name + " [cnt=" + totalCnt + ", " + avgName + "=" + ((float)totalSum / totalCnt) + ']'); + } + } + + /** + * + */ + private static class WaitTxFuture extends GridFutureAdapter { + /** */ + private final long txId; + + /** + * @param txId Transaction ID. + */ + WaitTxFuture(long txId) { + this.txId = txId; + } + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java deleted file mode 100644 index c46a624..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java +++ /dev/null @@ -1,999 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.mvcc; - -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.Event; -import org.apache.ignite.internal.GridTopic; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; -import org.apache.ignite.internal.managers.communication.GridMessageListener; -import org.apache.ignite.internal.managers.discovery.DiscoCache; -import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.GridAtomicLong; -import org.apache.ignite.internal.util.GridLongList; -import org.apache.ignite.internal.util.future.GridCompoundFuture; -import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteInClosure; -import org.jetbrains.annotations.Nullable; -import org.jsr166.LongAdder8; - -import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; -import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; -import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR; -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; - -/** - * - */ -public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { - /** */ - public static final long COUNTER_NA = 0L; - - /** */ - private static final boolean STAT_CNTRS = false; - - /** */ - private static final GridTopic MSG_TOPIC = TOPIC_CACHE_COORDINATOR; - - /** */ - private static final byte MSG_POLICY = SYSTEM_POOL; - - /** */ - private final CoordinatorAssignmentHistory assignHist = new CoordinatorAssignmentHistory(); - - /** */ - private final AtomicLong mvccCntr = new AtomicLong(1L); - - /** */ - private final GridAtomicLong committedCntr = new GridAtomicLong(1L); - - /** */ - private final ConcurrentSkipListMap<Long, GridCacheVersion> activeTxs = new ConcurrentSkipListMap<>(); - - /** */ - private final ConcurrentMap<Long, AtomicInteger> activeQueries = new ConcurrentHashMap<>(); - - /** */ - private final ConcurrentMap<Long, MvccVersionFuture> verFuts = new ConcurrentHashMap<>(); - - /** */ - private final ConcurrentMap<Long, WaitAckFuture> ackFuts = new ConcurrentHashMap<>(); - - /** */ - private ConcurrentMap<Long, WaitTxFuture> waitTxFuts = new ConcurrentHashMap<>(); - - /** */ - private final AtomicLong futIdCntr = new AtomicLong(); - - /** */ - private final CountDownLatch crdLatch = new CountDownLatch(1); - - /** Topology version when local node was assigned as coordinator. */ - private long crdVer; - - /** */ - private StatCounter[] statCntrs; - - /** {@inheritDoc} */ - @Override protected void start0() throws IgniteCheckedException { - super.start0(); - - statCntrs = new StatCounter[7]; - - statCntrs[0] = new CounterWithAvg("CoordinatorTxCounterRequest", "avgTxs"); - statCntrs[1] = new CounterWithAvg("MvccCoordinatorVersionResponse", "avgFutTime"); - statCntrs[2] = new StatCounter("CoordinatorTxAckRequest"); - statCntrs[3] = new CounterWithAvg("CoordinatorTxAckResponse", "avgFutTime"); - statCntrs[4] = new StatCounter("TotalRequests"); - statCntrs[5] = new StatCounter("CoordinatorWaitTxsRequest"); - statCntrs[6] = new CounterWithAvg("CoordinatorWaitTxsResponse", "avgFutTime"); - - cctx.gridEvents().addLocalEventListener(new CacheCoordinatorDiscoveryListener(), - EVT_NODE_FAILED, EVT_NODE_LEFT); - - cctx.gridIO().addMessageListener(MSG_TOPIC, new CoordinatorMessageListener()); - } - - /** - * @param log Logger. - */ - public void dumpStatistics(IgniteLogger log) { - if (STAT_CNTRS) { - log.info("Mvcc coordinator statistics: "); - - for (StatCounter cntr : statCntrs) - cntr.dumpInfo(log); - } - } - - /** - * @param tx Transaction. - * @return Counter. - */ - public MvccCoordinatorVersion requestTxCounterOnCoordinator(IgniteInternalTx tx) { - assert cctx.localNode().equals(assignHist.currentCoordinator()); - - return assignTxCounter(tx.nearXidVersion(), 0L); - } - - /** - * @param crd Coordinator. - * @param lsnr Response listener. - * @return Counter request future. - */ - public IgniteInternalFuture<MvccCoordinatorVersion> requestTxCounter(ClusterNode crd, MvccResponseListener lsnr, GridCacheVersion txVer) { - assert !crd.isLocal() : crd; - - MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), - crd, - lsnr); - - verFuts.put(fut.id, fut); - - try { - cctx.gridIO().sendToGridTopic(crd, - MSG_TOPIC, - new CoordinatorTxCounterRequest(fut.id, txVer), - MSG_POLICY); - } - catch (IgniteCheckedException e) { - fut.onError(e); - } - - return fut; - } - - /** - * @param crd Coordinator. - * @param mvccVer Query version. - */ - public void ackQueryDone(ClusterNode crd, MvccCoordinatorVersion mvccVer) { - try { - long trackCntr = mvccVer.counter(); - - MvccLongList txs = mvccVer.activeTransactions(); - - if (txs != null) { - for (int i = 0; i < txs.size(); i++) { - long txId = txs.get(i); - - if (txId < trackCntr) - trackCntr = txId; - } - } - - cctx.gridIO().sendToGridTopic(crd, - MSG_TOPIC, - new CoordinatorQueryAckRequest(trackCntr), - MSG_POLICY); - } - catch (ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send query ack, node left [crd=" + crd + ']'); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send query ack [crd=" + crd + ", cntr=" + mvccVer + ']', e); - } - } - - /** - * @param crd Coordinator. - * @return Counter request future. - */ - public IgniteInternalFuture<MvccCoordinatorVersion> requestQueryCounter(ClusterNode crd) { - assert crd != null; - - // TODO IGNITE-3478: special case for local? - MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), crd, null); - - verFuts.put(fut.id, fut); - - try { - cctx.gridIO().sendToGridTopic(crd, - MSG_TOPIC, - new CoordinatorQueryVersionRequest(fut.id), - MSG_POLICY); - } - catch (IgniteCheckedException e) { - if (verFuts.remove(fut.id) != null) - fut.onDone(e); - } - - return fut; - } - - /** - * @param crd Coordinator. - * @param txs Transaction IDs. - * @return Future. - */ - public IgniteInternalFuture<Void> waitTxsFuture(ClusterNode crd, GridLongList txs) { - assert crd != null; - assert txs != null && txs.size() > 0; - - // TODO IGNITE-3478: special case for local? - - WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), crd, false); - - ackFuts.put(fut.id, fut); - - try { - cctx.gridIO().sendToGridTopic(crd, - MSG_TOPIC, - new CoordinatorWaitTxsRequest(fut.id, txs), - MSG_POLICY); - } - catch (ClusterTopologyCheckedException e) { - if (ackFuts.remove(fut.id) != null) - fut.onDone(); // No need to ack, finish without error. - } - catch (IgniteCheckedException e) { - if (ackFuts.remove(fut.id) != null) - fut.onDone(e); - } - - return fut; - } - - /** - * @param crd Coordinator. - * @param mvccVer Transaction version. - * @return Acknowledge future. - */ - public IgniteInternalFuture<Void> ackTxCommit(ClusterNode crd, MvccCoordinatorVersion mvccVer) { - assert crd != null; - assert mvccVer != null; - - WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), crd, true); - - ackFuts.put(fut.id, fut); - - try { - cctx.gridIO().sendToGridTopic(crd, - MSG_TOPIC, - new CoordinatorTxAckRequest(fut.id, mvccVer.counter()), - MSG_POLICY); - } - catch (ClusterTopologyCheckedException e) { - if (ackFuts.remove(fut.id) != null) - fut.onDone(); // No need to ack, finish without error. - } - catch (IgniteCheckedException e) { - if (ackFuts.remove(fut.id) != null) - fut.onDone(e); - } - - return fut; - } - - /** - * @param crd Coordinator. - * @param mvccVer Transaction version. - */ - public void ackTxRollback(ClusterNode crd, MvccCoordinatorVersion mvccVer) { - CoordinatorTxAckRequest msg = new CoordinatorTxAckRequest(0, mvccVer.counter()); - - msg.skipResponse(true); - - try { - cctx.gridIO().sendToGridTopic(crd, - MSG_TOPIC, - msg, - MSG_POLICY); - } - catch (ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send tx rollback ack, node left [msg=" + msg + ", node=" + crd.id() + ']'); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send tx rollback ack [msg=" + msg + ", node=" + crd.id() + ']', e); - } - } - - /** - * @param nodeId Sender node ID. - * @param msg Message. - */ - private void processCoordinatorTxCounterRequest(UUID nodeId, CoordinatorTxCounterRequest msg) { - ClusterNode node = cctx.discovery().node(nodeId); - - if (node == null) { - if (log.isDebugEnabled()) - log.debug("Ignore tx counter request processing, node left [msg=" + msg + ", node=" + nodeId + ']'); - - return; - } - - MvccCoordinatorVersionResponse res = assignTxCounter(msg.txId(), msg.futureId()); - - if (STAT_CNTRS) - statCntrs[0].update(res.size()); - - try { - cctx.gridIO().sendToGridTopic(node, - MSG_TOPIC, - res, - MSG_POLICY); - } - catch (ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send tx counter response, node left [msg=" + msg + ", node=" + nodeId + ']'); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send tx counter response [msg=" + msg + ", node=" + nodeId + ']', e); - } - } - - /** - * - * @param nodeId Sender node ID. - * @param msg Message. - */ - private void processCoordinatorQueryVersionRequest(UUID nodeId, CoordinatorQueryVersionRequest msg) { - ClusterNode node = cctx.discovery().node(nodeId); - - if (node == null) { - if (log.isDebugEnabled()) - log.debug("Ignore query counter request processing, node left [msg=" + msg + ", node=" + nodeId + ']'); - - return; - } - - MvccCoordinatorVersionResponse res = assignQueryCounter(nodeId, msg.futureId()); - - try { - cctx.gridIO().sendToGridTopic(node, - MSG_TOPIC, - res, - MSG_POLICY); - } - catch (ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send query counter response, node left [msg=" + msg + ", node=" + nodeId + ']'); - - onQueryDone(res.counter()); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send query counter response [msg=" + msg + ", node=" + nodeId + ']', e); - - onQueryDone(res.counter()); - } - } - - /** - * @param nodeId Sender node ID. - * @param msg Message. - */ - private void processCoordinatorVersionResponse(UUID nodeId, MvccCoordinatorVersionResponse msg) { - MvccVersionFuture fut = verFuts.remove(msg.futureId()); - - if (fut != null) { - if (STAT_CNTRS) - statCntrs[1].update((System.nanoTime() - fut.startTime) * 1000); - - fut.onResponse(msg); - } - else { - if (cctx.discovery().alive(nodeId)) - U.warn(log, "Failed to find query version future [node=" + nodeId + ", msg=" + msg + ']'); - else if (log.isDebugEnabled()) - log.debug("Failed to find query version future [node=" + nodeId + ", msg=" + msg + ']'); - } - } - - /** - * @param msg Message. - */ - private void processCoordinatorQueryAckRequest(CoordinatorQueryAckRequest msg) { - onQueryDone(msg.counter()); - } - - /** - * @param nodeId Sender node ID. - * @param msg Message. - */ - private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorTxAckRequest msg) { - onTxDone(msg.txCounter()); - - if (STAT_CNTRS) - statCntrs[2].update(); - - if (!msg.skipResponse()) { - try { - cctx.gridIO().sendToGridTopic(nodeId, - MSG_TOPIC, - new CoordinatorFutureResponse(msg.futureId()), - MSG_POLICY); - } - catch (ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send tx ack response, node left [msg=" + msg + ", node=" + nodeId + ']'); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send tx ack response [msg=" + msg + ", node=" + nodeId + ']', e); - } - } - } - - /** - * @param nodeId Sender node ID. - * @param msg Message. - */ - private void processCoordinatorAckResponse(UUID nodeId, CoordinatorFutureResponse msg) { - WaitAckFuture fut = ackFuts.remove(msg.futureId()); - - if (fut != null) { - if (STAT_CNTRS) { - StatCounter cntr = fut.ackTx ? statCntrs[3] : statCntrs[6]; - - cntr.update((System.nanoTime() - fut.startTime) * 1000); - } - - fut.onResponse(); - } - else { - if (cctx.discovery().alive(nodeId)) - U.warn(log, "Failed to find tx ack future [node=" + nodeId + ", msg=" + msg + ']'); - else if (log.isDebugEnabled()) - log.debug("Failed to find tx ack future [node=" + nodeId + ", msg=" + msg + ']'); - } - } - - /** - * @param txId Transaction ID. - * @return Counter. - */ - private MvccCoordinatorVersionResponse assignTxCounter(GridCacheVersion txId, long futId) { - assert crdVer != 0; - - long nextCtr = mvccCntr.incrementAndGet(); - - // TODO IGNITE-3478 sorted? + change GridLongList.writeTo? - MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse(); - - for (Long txVer : activeTxs.keySet()) - res.addTx(txVer); - - Object old = activeTxs.put(nextCtr, txId); - - assert old == null : txId; - - long cleanupVer = committedCntr.get() - 1; - - for (Long qryVer : activeQueries.keySet()) { - if (qryVer <= cleanupVer) - cleanupVer = qryVer - 1; - } - - res.init(futId, crdVer, nextCtr, cleanupVer); - - return res; - } - - /** - * @param txCntr Counter assigned to transaction. - */ - private void onTxDone(Long txCntr) { - GridFutureAdapter fut; // TODO IGNITE-3478. - - GridCacheVersion ver = activeTxs.remove(txCntr); - - assert ver != null; - - committedCntr.setIfGreater(txCntr); - - fut = waitTxFuts.remove(txCntr); - - if (fut != null) - fut.onDone(); - } - - static boolean increment(AtomicInteger cntr) { - for (;;) { - int current = cntr.get(); - - if (current == 0) - return false; - - if (cntr.compareAndSet(current, current + 1)) - return true; - } - } - - /** - * @param qryNodeId Node initiated query. - * @return Counter for query. - */ - private MvccCoordinatorVersionResponse assignQueryCounter(UUID qryNodeId, long futId) { - assert crdVer != 0; - - MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse(); - - Long mvccCntr; - - for(;;) { - mvccCntr = committedCntr.get(); - - Long trackCntr = mvccCntr; - - for (Long txVer : activeTxs.keySet()) { - if (txVer < trackCntr) - trackCntr = txVer; - - res.addTx(txVer); - } - - registerActiveQuery(trackCntr); - - if (committedCntr.get() == mvccCntr) - break; - else { - res.resetTransactionsCount(); - - onQueryDone(trackCntr); - } - } - - res.init(futId, crdVer, mvccCntr, COUNTER_NA); - - return res; - } - - private void registerActiveQuery(Long cntr) { - for (;;) { - AtomicInteger qryCnt = activeQueries.get(cntr); - - if (qryCnt != null) { - boolean inc = increment(qryCnt); - - if (!inc) { - activeQueries.remove(mvccCntr, qryCnt); - - continue; - } - } - else { - qryCnt = new AtomicInteger(1); - - if (activeQueries.putIfAbsent(cntr, qryCnt) != null) - continue; - } - - break; - } - } - - /** - * @param mvccCntr Query counter. - */ - private void onQueryDone(long mvccCntr) { - AtomicInteger cntr = activeQueries.get(mvccCntr); - - assert cntr != null : mvccCntr; - - int left = cntr.decrementAndGet(); - - assert left >= 0 : left; - - if (left == 0) { - boolean rmv = activeQueries.remove(mvccCntr, cntr); - - assert rmv; - } - } - - /** - * @param msg Message. - */ - private void processCoordinatorWaitTxsRequest(final UUID nodeId, final CoordinatorWaitTxsRequest msg) { - statCntrs[5].update(); - - GridLongList txs = msg.transactions(); - - GridCompoundFuture resFut = null; - - for (int i = 0; i < txs.size(); i++) { - Long txId = txs.get(i); - - WaitTxFuture fut = waitTxFuts.get(txId); - - if (fut == null) { - WaitTxFuture old = waitTxFuts.putIfAbsent(txId, fut = new WaitTxFuture(txId)); - - if (old != null) - fut = old; - } - - if (!activeTxs.containsKey(txId)) - fut.onDone(); - - if (!fut.isDone()) { - if (resFut == null) - resFut = new GridCompoundFuture(); - - resFut.add(fut); - } - } - - if (resFut != null) - resFut.markInitialized(); - - if (resFut == null || resFut.isDone()) - sendFutureResponse(nodeId, msg); - else { - resFut.listen(new IgniteInClosure<IgniteInternalFuture>() { - @Override public void apply(IgniteInternalFuture fut) { - sendFutureResponse(nodeId, msg); - } - }); - } - } - - /** - * @param nodeId - * @param msg - */ - private void sendFutureResponse(UUID nodeId, CoordinatorWaitTxsRequest msg) { - try { - cctx.gridIO().sendToGridTopic(nodeId, - MSG_TOPIC, - new CoordinatorFutureResponse(msg.futureId()), - MSG_POLICY); - } - catch (ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send tx ack response, node left [msg=" + msg + ", node=" + nodeId + ']'); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send tx ack response [msg=" + msg + ", node=" + nodeId + ']', e); - } - } - - /** - * @param topVer Topology version. - * @return MVCC coordinator for given topology version. - */ - @Nullable public ClusterNode coordinator(AffinityTopologyVersion topVer) { - return assignHist.coordinator(topVer); - } - - /** - * @param discoCache Discovery snapshot. - */ - public void assignCoordinator(DiscoCache discoCache) { - ClusterNode curCrd = assignHist.currentCoordinator(); - - if (curCrd == null || !discoCache.allNodes().contains(curCrd)) { - ClusterNode newCrd = null; - - if (!discoCache.serverNodes().isEmpty()) - newCrd = discoCache.serverNodes().get(0); - - if (!F.eq(curCrd, newCrd)) { - assignHist.addAssignment(discoCache.version(), newCrd); - - if (cctx.localNode().equals(newCrd)) { - crdVer = discoCache.version().topologyVersion(); - - crdLatch.countDown(); - } - - log.info("Assigned mvcc coordinator [topVer=" + discoCache.version() + - ", crd=" + newCrd + ']'); - - return; - } - } - - assignHist.addAssignment(discoCache.version(), curCrd); - } - - /** - * - */ - public class MvccVersionFuture extends GridFutureAdapter<MvccCoordinatorVersion> { - /** */ - private final Long id; - - /** */ - private MvccResponseListener lsnr; - - /** */ - public final ClusterNode crd; - - /** */ - long startTime; - - /** - * @param id Future ID. - * @param crd Coordinator. - */ - MvccVersionFuture(Long id, ClusterNode crd, @Nullable MvccResponseListener lsnr) { - this.id = id; - this.crd = crd; - this.lsnr = lsnr; - - if (STAT_CNTRS) - startTime = System.nanoTime(); - } - - /** - * @param res Response. - */ - void onResponse(MvccCoordinatorVersionResponse res) { - assert res.counter() != COUNTER_NA; - - if (lsnr != null) - lsnr.onMvccResponse(res); - - onDone(res); - } - - void onError(IgniteCheckedException err) { - if (verFuts.remove(id) != null) { - if (lsnr != null) - lsnr.onMvccError(err); - - onDone(err); - } - } - - /** - * @param nodeId Failed node ID. - */ - void onNodeLeft(UUID nodeId) { - if (crd.id().equals(nodeId)) { - ClusterTopologyCheckedException err = new ClusterTopologyCheckedException("Failed to request coordinator version, " + - "coordinator failed: " + nodeId); - - onError(err); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "MvccVersionFuture [crd=" + crd + ", id=" + id + ']'; - } - } - - /** - * - */ - private class WaitAckFuture extends GridFutureAdapter<Void> { - /** */ - private final long id; - - /** */ - private final ClusterNode crd; - - /** */ - long startTime; - - /** */ - final boolean ackTx; - - /** - * @param id Future ID. - * @param crd Coordinator. - */ - WaitAckFuture(long id, ClusterNode crd, boolean ackTx) { - this.id = id; - this.crd = crd; - this.ackTx = ackTx; - - if (STAT_CNTRS) - startTime = System.nanoTime(); - } - - /** - * - */ - void onResponse() { - onDone(); - } - - /** - * @param nodeId Failed node ID. - */ - void onNodeLeft(UUID nodeId) { - if (crd.id().equals(nodeId) && verFuts.remove(id) != null) - onDone(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "WaitAckFuture [crd=" + crd + ", id=" + id + ']'; - } - } - - /** - * - */ - private class CacheCoordinatorDiscoveryListener implements GridLocalEventListener { - /** {@inheritDoc} */ - @Override public void onEvent(Event evt) { - assert evt instanceof DiscoveryEvent : evt; - - DiscoveryEvent discoEvt = (DiscoveryEvent)evt; - - UUID nodeId = discoEvt.eventNode().id(); - - for (MvccVersionFuture fut : verFuts.values()) - fut.onNodeLeft(nodeId); - - for (WaitAckFuture fut : ackFuts.values()) - fut.onNodeLeft(nodeId); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "CacheCoordinatorDiscoveryListener[]"; - } - } - /** - * - */ - private class CoordinatorMessageListener implements GridMessageListener { - /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg, byte plc) { - if (STAT_CNTRS) - statCntrs[4].update(); - - MvccCoordinatorMessage msg0 = (MvccCoordinatorMessage)msg; - - if (msg0.waitForCoordinatorInit()) { - if (crdVer == 0) { - try { - U.await(crdLatch); - } - catch (IgniteInterruptedCheckedException e) { - U.warn(log, "Failed to wait for coordinator initialization, thread interrupted [" + - "msgNode=" + nodeId + ", msg=" + msg + ']'); - - return; - } - - assert crdVer != 0L; - } - } - - if (msg instanceof CoordinatorTxCounterRequest) - processCoordinatorTxCounterRequest(nodeId, (CoordinatorTxCounterRequest)msg); - else if (msg instanceof CoordinatorTxAckRequest) - processCoordinatorTxAckRequest(nodeId, (CoordinatorTxAckRequest)msg); - else if (msg instanceof CoordinatorFutureResponse) - processCoordinatorAckResponse(nodeId, (CoordinatorFutureResponse)msg); - else if (msg instanceof CoordinatorQueryAckRequest) - processCoordinatorQueryAckRequest((CoordinatorQueryAckRequest)msg); - else if (msg instanceof CoordinatorQueryVersionRequest) - processCoordinatorQueryVersionRequest(nodeId, (CoordinatorQueryVersionRequest)msg); - else if (msg instanceof MvccCoordinatorVersionResponse) - processCoordinatorVersionResponse(nodeId, (MvccCoordinatorVersionResponse) msg); - else if (msg instanceof CoordinatorWaitTxsRequest) - processCoordinatorWaitTxsRequest(nodeId, (CoordinatorWaitTxsRequest)msg); - else - U.warn(log, "Unexpected message received [node=" + nodeId + ", msg=" + msg + ']'); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "CoordinatorMessageListener[]"; - } - } - /** - * - */ - static class StatCounter { - /** */ - final String name; - - /** */ - final LongAdder8 cntr = new LongAdder8(); - - public StatCounter(String name) { - this.name = name; - } - - void update() { - cntr.increment(); - } - - void update(GridLongList arg) { - throw new UnsupportedOperationException(); - } - - void update(long arg) { - throw new UnsupportedOperationException(); - } - - void dumpInfo(IgniteLogger log) { - long totalCnt = cntr.sumThenReset(); - - if (totalCnt > 0) - log.info(name + " [cnt=" + totalCnt + ']'); - } - } - - /** - * - */ - static class CounterWithAvg extends StatCounter { - /** */ - final LongAdder8 total = new LongAdder8(); - - /** */ - final String avgName; - - CounterWithAvg(String name, String avgName) { - super(name); - - this.avgName = avgName; - } - - @Override void update(GridLongList arg) { - update(arg != null ? arg.size() : 0); - } - - @Override void update(long add) { - cntr.increment(); - - total.add(add); - } - - void dumpInfo(IgniteLogger log) { - long totalCnt = cntr.sumThenReset(); - long totalSum = total.sumThenReset(); - - if (totalCnt > 0) - log.info(name + " [cnt=" + totalCnt + ", " + avgName + "=" + ((float)totalSum / totalCnt) + ']'); - } - } - - /** - * - */ - private static class WaitTxFuture extends GridFutureAdapter { - /** */ - private final long txId; - - /** - * @param txId Transaction ID. - */ - WaitTxFuture(long txId) { - this.txId = txId; - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAssignmentHistory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAssignmentHistory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAssignmentHistory.java deleted file mode 100644 index 40354a8..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAssignmentHistory.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.mvcc; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.lang.IgniteBiTuple; - -/** - * - */ -class CoordinatorAssignmentHistory { - /** */ - private volatile Map<AffinityTopologyVersion, ClusterNode> assignHist = Collections.emptyMap(); - - /** */ - private volatile IgniteBiTuple<AffinityTopologyVersion, ClusterNode> - cur = new IgniteBiTuple<>(AffinityTopologyVersion.NONE, null); - - void addAssignment(AffinityTopologyVersion topVer, ClusterNode crd) { - assert !assignHist.containsKey(topVer); - assert topVer.compareTo(cur.get1()) > 0; - - cur = new IgniteBiTuple<>(topVer, crd); - - Map<AffinityTopologyVersion, ClusterNode> hist = new HashMap<>(assignHist); - - hist.put(topVer, crd); - - assignHist = hist; - - } - - ClusterNode currentCoordinator() { - return cur.get2(); - } - - ClusterNode coordinator(AffinityTopologyVersion topVer) { - assert topVer.initialized() : topVer; - - IgniteBiTuple<AffinityTopologyVersion, ClusterNode> cur0 = cur; - - if (cur0.get1().equals(topVer)) - return cur0.get2(); - - Map<AffinityTopologyVersion, ClusterNode> assignHist0 = assignHist; - - assert assignHist.containsKey(topVer) : - "No coordinator assignment [topVer=" + topVer + ", curVer=" + cur0.get1() + ", hist=" + assignHist0.keySet() + ']'; - - return assignHist0.get(topVer); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java new file mode 100644 index 0000000..0b449d2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.io.Serializable; +import java.util.UUID; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; + +/** + * + */ +public class MvccCoordinator implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final UUID nodeId; + + /** + * Unique coordinator version, increases when new coordinator is assigned, + * can differ from topVer if we decide to assign coordinator manually. + */ + private final long crdVer; + + /** */ + private final AffinityTopologyVersion topVer; + + /** + * @param nodeId Coordinator node ID. + * @param crdVer Coordinator version. + * @param topVer Topology version when coordinator was assigned. + */ + public MvccCoordinator(UUID nodeId, long crdVer, AffinityTopologyVersion topVer) { + assert nodeId != null; + assert crdVer > 0 : crdVer; + assert topVer != null; + + this.nodeId = nodeId; + this.crdVer = crdVer; + this.topVer = topVer; + } + + /** + * @return Unique coordinator version. + */ + public long coordinatorVersion() { + return crdVer; + } + + /** + * @return Coordinator node ID. + */ + public UUID nodeId() { + return nodeId; + } + + /** + * @return Topology version when coordinator was assigned. + */ + public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + MvccCoordinator that = (MvccCoordinator)o; + + return crdVer == that.crdVer; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return (int)(crdVer ^ (crdVer >>> 32)); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "MvccCoordinator [node=" + nodeId + ", ver=" + crdVer + ", topVer=" + topVer + ']'; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java new file mode 100644 index 0000000..bec3301 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class MvccCounter implements Message { + /** */ + private long crdVer; + + /** */ + private long cntr; + + /** + * + */ + public MvccCounter() { + // No-po. + } + + /** + * @param crdVer Coordinator version. + * @param cntr Counter. + */ + public MvccCounter(long crdVer, long cntr) { + this.crdVer = crdVer; + this.cntr = cntr; + } + + /** + * @return Coordinator version. + */ + public long coordinatorVersion() { + return crdVer; + } + + /** + * @return Counter. + */ + public long counter() { + return cntr; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + MvccCounter that = (MvccCounter) o; + + return crdVer == that.crdVer && cntr == that.cntr; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = (int) (crdVer ^ (crdVer >>> 32)); + res = 31 * res + (int) (cntr ^ (cntr >>> 32)); + return res; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeLong("cntr", cntr)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeLong("crdVer", crdVer)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + cntr = reader.readLong("cntr"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + crdVer = reader.readLong("crdVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(MvccCounter.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 141; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MvccCounter.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryAware.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryAware.java new file mode 100644 index 0000000..d5172c6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryAware.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public interface MvccQueryAware { + /** + * @param newCrd New coordinator. + * @return Version used by this query. + */ + @Nullable public MvccCoordinatorVersion onMvccCoordinatorChange(MvccCoordinator newCrd); + + /** + * @param topVer Topology version when version was requested. + */ + public void onMvccVersionReceived(AffinityTopologyVersion topVer); + + /** + * @param e Error. + */ + public void onMvccVersionError(IgniteCheckedException e); +}
