http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java new file mode 100644 index 0000000..360af4c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteInClosure; +import org.jetbrains.annotations.Nullable; + +/** + * TODO IGNITE-3478: make sure clean up is called when related future is forcibly finished, i.e. on cache stop + */ +public class MvccQueryTracker { + /** */ + private MvccCoordinator mvccCrd; + + /** */ + private MvccCoordinatorVersion mvccVer; + + /** */ + @GridToStringExclude + private final GridCacheContext cctx; + + /** */ + private final boolean canRemap; + + /** */ + @GridToStringExclude + private final MvccQueryAware lsnr; + + /** + * @param cctx Cache context. + * @param canRemap {@code True} if can wait for topology changes. + * @param lsnr Listener. + */ + public MvccQueryTracker(GridCacheContext cctx, boolean canRemap, MvccQueryAware lsnr) { + assert cctx.mvccEnabled() : cctx.name(); + + this.cctx = cctx; + this.canRemap = canRemap; + this.lsnr = lsnr; + } + + /** + * @return Requested mvcc version. + */ + public MvccCoordinatorVersion mvccVersion() { + assert mvccVer != null : this; + + return mvccVer; + } + + /** {@inheritDoc} */ + @Nullable public MvccCoordinatorVersion onMvccCoordinatorChange(MvccCoordinator newCrd) { + synchronized (this) { + if (mvccVer != null) { + assert mvccCrd != null : this; + + if (!mvccCrd.equals(newCrd)) { + mvccCrd = newCrd; // Need notify new coordinator. + + return mvccVer; + } + else + return null; + } + else if (mvccCrd != null) + mvccCrd = null; // Mark for remap. + + return null; + } + } + + /** + * + */ + public void onQueryDone() { + MvccCoordinator mvccCrd0 = null; + MvccCoordinatorVersion mvccVer0 = null; + + synchronized (this) { + if (mvccVer != null) { + assert mvccCrd != null; + + mvccCrd0 = mvccCrd; + mvccVer0 = mvccVer; + + mvccVer = null; // Mark as finished. + } + } + + if (mvccVer0 != null) + cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0); + } + + /** + * @param topVer Topology version. + */ + public void requestVersion(final AffinityTopologyVersion topVer) { + MvccCoordinator mvccCrd0 = cctx.affinity().mvccCoordinator(topVer); + + if (mvccCrd0 == null) { + lsnr.onMvccVersionError(new IgniteCheckedException("Mvcc coordinator is not assigned: " + topVer)); + + return; + } + + synchronized (this) { + this.mvccCrd = mvccCrd0; + } + + MvccCoordinator curCrd = cctx.topology().mvccCoordinator(); + + if (!mvccCrd0.equals(curCrd)) { + assert cctx.topology().topologyVersionFuture().initialVersion().compareTo(topVer) > 0; + + if (!canRemap) { + lsnr.onMvccVersionError(new ClusterTopologyCheckedException("Failed to request mvcc version, coordinator changed.")); + + return; + } + else { + waitNextTopology(topVer); + + return; + } + } + + IgniteInternalFuture<MvccCoordinatorVersion> cntrFut = + cctx.shared().coordinators().requestQueryCounter(mvccCrd0); + + cntrFut.listen(new IgniteInClosure<IgniteInternalFuture<MvccCoordinatorVersion>>() { + @Override public void apply(IgniteInternalFuture<MvccCoordinatorVersion> fut) { + try { + MvccCoordinatorVersion rcvdVer = fut.get(); + + assert rcvdVer != null; + + boolean needRemap = false; + + synchronized (MvccQueryTracker.this) { + assert mvccVer == null : "[this=" + MvccQueryTracker.this + + ", ver=" + mvccVer + + ", rcvdVer=" + rcvdVer + "]"; + + if (mvccCrd != null) { + mvccVer = rcvdVer; + } + else + needRemap = true; + } + + if (!needRemap) { + lsnr.onMvccVersionReceived(topVer); + + return; + } + } + catch (ClusterTopologyCheckedException e) { + IgniteLogger log = cctx.logger(MvccQueryTracker.class); + + if (log.isDebugEnabled()) + log.debug("Mvcc coordinator failed, need remap: " + e); + } + catch (IgniteCheckedException e) { + lsnr.onMvccVersionError(e); + + return; + } + + // Coordinator failed or reassigned, need remap. + if (canRemap) + waitNextTopology(topVer); + else { + lsnr.onMvccVersionError(new ClusterTopologyCheckedException("Failed to " + + "request mvcc version, coordinator failed.")); + } + } + }); + } + + /** + * @param topVer Current topology version. + */ + private void waitNextTopology(AffinityTopologyVersion topVer) { + assert canRemap; + + IgniteInternalFuture<AffinityTopologyVersion> waitFut = + cctx.shared().exchange().affinityReadyFuture(topVer.nextMinorVersion()); + + if (waitFut == null) + requestVersion(cctx.shared().exchange().readyAffinityVersion()); + else { + waitFut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { + try { + requestVersion(fut.get()); + } + catch (IgniteCheckedException e) { + lsnr.onMvccVersionError(e); + } + } + }); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MvccQueryTracker.class, this); + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccResponseListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccResponseListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccResponseListener.java index 11d0da0..627a007 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccResponseListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccResponseListener.java @@ -17,13 +17,21 @@ package org.apache.ignite.internal.processors.cache.mvcc; +import java.util.UUID; import org.apache.ignite.IgniteCheckedException; /** * */ public interface MvccResponseListener { - public void onMvccResponse(MvccCoordinatorVersion res); + /** + * @param crdId Coordinator node ID. + * @param res Version. + */ + public void onMvccResponse(UUID crdId, MvccCoordinatorVersion res); + /** + * @param e Error. + */ public void onMvccError(IgniteCheckedException e); } http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NewCoordinatorQueryAckRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NewCoordinatorQueryAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NewCoordinatorQueryAckRequest.java new file mode 100644 index 0000000..5631fed --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NewCoordinatorQueryAckRequest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class NewCoordinatorQueryAckRequest implements MvccCoordinatorMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long crdVer; + + /** */ + private long cntr; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public NewCoordinatorQueryAckRequest() { + // No-op. + } + + /** + * @param crdVer Coordinator version. + * @param cntr Query counter. + */ + NewCoordinatorQueryAckRequest(long crdVer, long cntr) { + this.crdVer = crdVer; + this.cntr = cntr; + } + + /** {@inheritDoc} */ + @Override public boolean waitForCoordinatorInit() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean processedFromNioThread() { + return true; + } + + /** + * @return Coordinator version. + */ + public long coordinatorVersion() { + return crdVer; + } + + /** + * @return Counter. + */ + public long counter() { + return cntr; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeLong("cntr", cntr)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeLong("crdVer", crdVer)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + cntr = reader.readLong("cntr"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + crdVer = reader.readLong("crdVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(NewCoordinatorQueryAckRequest.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 140; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(NewCoordinatorQueryAckRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java new file mode 100644 index 0000000..700b27d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +class PreviousCoordinatorQueries { + /** */ + private volatile boolean prevQueriesDone; + + /** */ + private final ConcurrentHashMap<UUID, Map<MvccCounter, Integer>> activeQueries = new ConcurrentHashMap<>(); + + /** */ + private Set<UUID> rcvd; + + /** */ + private Set<UUID> waitNodes; + + /** */ + private boolean initDone; + + /** + * @param srvNodesQueries Active queries started on server nodes. + * @param discoCache Discovery data. + * @param mgr Discovery manager. + */ + void init(Map<UUID, Map<MvccCounter, Integer>> srvNodesQueries, DiscoCache discoCache, GridDiscoveryManager mgr) { + synchronized (this) { + assert !initDone; + assert waitNodes == null; + + waitNodes = new HashSet<>(); + + for (ClusterNode node : discoCache.allNodes()) { + if (CU.clientNode(node) && mgr.alive(node) && !F.contains(rcvd, node.id())) + waitNodes.add(node.id()); + } + + initDone = waitNodes.isEmpty(); + + if (srvNodesQueries != null) { + for (Map.Entry<UUID, Map<MvccCounter, Integer>> e : srvNodesQueries.entrySet()) + addAwaitedActiveQueries(e.getKey(), e.getValue()); + } + + if (initDone && !prevQueriesDone) + prevQueriesDone = activeQueries.isEmpty(); + } + } + + /** + * @return {@code True} if all queries on + */ + boolean previousQueriesDone() { + return prevQueriesDone; + } + + /** + * @param nodeId Node ID. + * @param nodeQueries Active queries started on node. + */ + private void addAwaitedActiveQueries(UUID nodeId, Map<MvccCounter, Integer> nodeQueries) { + if (F.isEmpty(nodeQueries) || prevQueriesDone) + return; + + Map<MvccCounter, Integer> queries = activeQueries.get(nodeId); + + if (queries == null) + activeQueries.put(nodeId, nodeQueries); + else { + for (Map.Entry<MvccCounter, Integer> e : nodeQueries.entrySet()) { + Integer qryCnt = queries.get(e.getKey()); + + int newQryCnt = (qryCnt == null ? 0 : qryCnt) + e.getValue(); + + if (newQryCnt == 0) { + queries.remove(e.getKey()); + + if (queries.isEmpty()) + activeQueries.remove(nodeId); + } + else + queries.put(e.getKey(), newQryCnt); + } + } + + if (initDone && !prevQueriesDone) + prevQueriesDone = activeQueries.isEmpty(); + } + + /** + * @param nodeId Node ID. + * @param nodeQueries Active queries started on node. + */ + void processClientActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, Integer> nodeQueries) { + synchronized (this) { + if (initDone) + return; + + if (waitNodes == null) { + if (rcvd == null) + rcvd = new HashSet<>(); + + rcvd.add(nodeId); + } + else + initDone = waitNodes.remove(nodeId); + + addAwaitedActiveQueries(nodeId, nodeQueries); + + if (initDone && !prevQueriesDone) + prevQueriesDone = activeQueries.isEmpty(); + } + } + + /** + * @param nodeId Failed node ID. + */ + void onNodeFailed(UUID nodeId) { + synchronized (this) { + initDone = waitNodes != null && waitNodes.remove(nodeId); + + if (initDone && !prevQueriesDone && activeQueries.remove(nodeId) != null) + prevQueriesDone = activeQueries.isEmpty(); + } + } + + /** + * @param nodeId Node ID. + * @param msg Message. + */ + void onQueryDone(UUID nodeId, NewCoordinatorQueryAckRequest msg) { + synchronized (this) { + MvccCounter cntr = new MvccCounter(msg.coordinatorVersion(), msg.counter()); + + Map<MvccCounter, Integer> nodeQueries = activeQueries.get(nodeId); + + if (nodeQueries == null) + activeQueries.put(nodeId, nodeQueries = new HashMap<>()); + + Integer qryCnt = nodeQueries.get(cntr); + + int newQryCnt = (qryCnt != null ? qryCnt : 0) - 1; + + if (newQryCnt == 0) { + nodeQueries.remove(cntr); + + if (nodeQueries.isEmpty()) { + activeQueries.remove(nodeId); + + if (initDone && !prevQueriesDone) + prevQueriesDone = activeQueries.isEmpty(); + } + } + else + nodeQueries.put(cntr, newQryCnt); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java new file mode 100644 index 0000000..428d707 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.nio.ByteBuffer; +import java.util.UUID; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class TxMvccInfo implements Message { + /** */ + private UUID crd; + + /** */ + private MvccCoordinatorVersion mvccVer; + + /** + * + */ + public TxMvccInfo() { + // No-op. + } + + /** + * @param crd + * @param mvccVer + */ + public TxMvccInfo(UUID crd, MvccCoordinatorVersion mvccVer) { + assert crd != null; + assert mvccVer != null; + + this.crd = crd; + this.mvccVer = mvccVer; + } + + public UUID coordinator() { + return crd; + } + + public MvccCoordinatorVersion version() { + return mvccVer; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeUuid("crd", crd)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeMessage("mvccVer", mvccVer)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + crd = reader.readUuid("crd"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + mvccVer = reader.readMessage("mvccVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(TxMvccInfo.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 139; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TxMvccInfo.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java index 5df74b8..0fb8adf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java @@ -197,7 +197,7 @@ public class IgniteWalIteratorFactory { dbMgr.setPageSize(pageSize); return new GridCacheSharedContext<>( - kernalCtx, null, null, null, null, + kernalCtx, null, null, null, null, null, dbMgr, null, null, null, null, null, null, null, null); http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java index 07be8b4..db575f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java @@ -47,6 +47,7 @@ import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager; import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor; import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; import org.apache.ignite.internal.processors.cluster.ClusterProcessor; @@ -440,6 +441,11 @@ public class StandaloneGridKernalContext implements GridKernalContext { } /** {@inheritDoc} */ + @Override public CacheCoordinatorsProcessor coordinators() { + return null; + } + + /** {@inheritDoc} */ @Override public void markSegmented() { } http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index 3433b4f..3a269db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedSet; @@ -535,11 +536,11 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage String clsName = qry.query().queryClassName(); // TODO IGNITE-3478. - final ClusterNode mvccCrd; + final MvccCoordinator mvccCrd; final MvccCoordinatorVersion mvccVer; if (cctx.mvccEnabled()) { - mvccCrd = cctx.shared().coordinators().coordinator(cctx.shared().exchange().readyAffinityVersion()); + mvccCrd = cctx.affinity().mvccCoordinator(cctx.shared().exchange().readyAffinityVersion()); IgniteInternalFuture<MvccCoordinatorVersion> fut0 = cctx.shared().coordinators().requestQueryCounter(mvccCrd); http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index b711a80..3ddee2c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -76,6 +76,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -825,7 +826,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @throws IgniteCheckedException If failed to get iterator. */ @SuppressWarnings({"unchecked"}) - private GridCloseableIterator scanIterator(final GridCacheQueryAdapter<?> qry, boolean locNode, ClusterNode mvccCrd) + private GridCloseableIterator scanIterator(final GridCacheQueryAdapter<?> qry, boolean locNode, MvccCoordinator mvccCrd) throws IgniteCheckedException { final IgniteBiPredicate<K, V> keyValFilter = qry.scanFilter(); @@ -1461,11 +1462,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte taskName)); } - final ClusterNode mvccCrd; + final MvccCoordinator mvccCrd; // TODO IGNITE-3478. if (cctx.mvccEnabled()) { - mvccCrd = cctx.shared().coordinators().coordinator(cctx.shared().exchange().readyAffinityVersion()); + mvccCrd = cctx.affinity().mvccCoordinator(cctx.shared().exchange().readyAffinityVersion()); IgniteInternalFuture<MvccCoordinatorVersion> fut0 = cctx.shared().coordinators().requestQueryCounter(mvccCrd); @@ -2915,7 +2916,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte private IgniteCacheExpiryPolicy expiryPlc; /** */ - private ClusterNode mvccCrd; + private MvccCoordinator mvccCrd; /** */ private MvccCoordinatorVersion mvccVer; @@ -2938,7 +2939,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte IgniteBiPredicate<K, V> scanFilter, boolean locNode, GridCacheContext cctx, - ClusterNode mvccCrd, + MvccCoordinator mvccCrd, IgniteLogger log) { assert mvccCrd == null || qry.mvccVersion() != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index 091ecc5..5009bd3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -30,7 +30,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; +import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.lang.GridTuple; @@ -637,7 +637,7 @@ public interface IgniteInternalTx { public void commitError(Throwable e); /** - * @param mvccVer Version. + * @param mvccInfo Mvcc information. */ - public void mvccCoordinatorVersion(MvccCoordinatorVersion mvccVer); + public void mvccInfo(TxMvccInfo mvccInfo); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index ee7dfd2..3b6db58 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -59,7 +59,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; -import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; +import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo; import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -254,7 +254,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement protected ConsistentIdMapper consistentIdMapper; /** */ - protected MvccCoordinatorVersion mvccVer; + protected TxMvccInfo mvccInfo; /** * Empty constructor required for {@link Externalizable}. @@ -374,13 +374,16 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement consistentIdMapper = new ConsistentIdMapper(cctx.discovery()); } - public MvccCoordinatorVersion mvccCoordinatorVersion() { - return mvccVer; + /** + * @return Mvcc info. + */ + @Nullable public TxMvccInfo mvccInfo() { + return mvccInfo; } /** {@inheritDoc} */ - @Override public void mvccCoordinatorVersion(MvccCoordinatorVersion mvccVer) { - this.mvccVer = mvccVer; + @Override public void mvccInfo(TxMvccInfo mvccInfo) { + this.mvccInfo = mvccInfo; } /** @@ -1893,7 +1896,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public void mvccCoordinatorVersion(MvccCoordinatorVersion mvccVer) { + @Override public void mvccInfo(TxMvccInfo mvccInfo) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index ef42a14..24f2a8d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -605,6 +605,8 @@ public class IgniteTxHandler { if (expVer.equals(curVer)) return false; + // TODO IGNITE-3478 check mvcc crd for mvcc enabled txs. + for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes())) { GridCacheContext ctx = e.context(); @@ -860,7 +862,7 @@ public class IgniteTxHandler { tx = ctx.tm().tx(dhtVer); if (tx != null) { - tx.mvccCoordinatorVersion(req.mvccCoordinatorVersion()); + tx.mvccInfo(req.mvccInfo()); req.txState(tx.txState()); } @@ -1312,7 +1314,7 @@ public class IgniteTxHandler { tx.commitVersion(req.commitVersion()); tx.invalidate(req.isInvalidate()); tx.systemInvalidate(req.isSystemInvalidate()); - tx.mvccCoordinatorVersion(req.mvccCoordinatorVersion()); + tx.mvccInfo(req.mvccInfo()); // Complete remote candidates. tx.doneRemote(req.baseVersion(), null, null, null); @@ -1359,7 +1361,7 @@ public class IgniteTxHandler { try { tx.commitVersion(req.writeVersion()); tx.invalidate(req.isInvalidate()); - tx.mvccCoordinatorVersion(req.mvccCoordinatorVersion()); + tx.mvccInfo(req.mvccInfo()); // Complete remote candidates. tx.doneRemote(req.version(), null, null, null); http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index ab70e95..92e6785 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -358,6 +358,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig * @param ret Result. */ public void implicitSingleResult(GridCacheReturn ret) { + assert ret != null; + if (ret.invokeResult()) implicitRes.mergeEntryProcessResults(ret); else @@ -518,7 +520,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig try { cctx.tm().txContext(this); - assert !txState.mvccEnabled(cctx) || mvccVer != null; + assert !txState.mvccEnabled(cctx) || mvccInfo != null; AffinityTopologyVersion topVer = topologyVersion(); @@ -698,7 +700,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig resolveTaskName(), dhtVer, null, - mvccVer); + mvccInfo != null ? mvccInfo.version() : null); if (updRes.success()) { txEntry.updateCounter(updRes.updatePartitionCounter()); @@ -736,7 +738,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig resolveTaskName(), dhtVer, null, - mvccVer); + mvccInfo != null ? mvccInfo.version() : null); } } else if (op == DELETE) { @@ -758,7 +760,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig resolveTaskName(), dhtVer, null, - mvccVer); + mvccInfo != null ? mvccInfo.version() : null); if (updRes.success()) txEntry.updateCounter(updRes.updatePartitionCounter()); @@ -782,7 +784,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig resolveTaskName(), dhtVer, null, - mvccVer); + mvccInfo != null ? mvccInfo.version() : null); } } else if (op == RELOAD) { http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java index a076e5c..3fc0962 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.cache.tree; import org.apache.ignite.internal.pagemem.PageUtils; -import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter; import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow; import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; @@ -60,7 +60,7 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i if (storeMvccVersion()) { assert row.mvccCoordinatorVersion() > 0 : row; - assert row.mvccCounter() != CacheCoordinatorsSharedManager.COUNTER_NA : row; + assert row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA : row; PageUtils.putLong(pageAddr, off, row.mvccCoordinatorVersion()); off += 8; @@ -123,7 +123,7 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i long mvcCntr = rowIo.getMvccUpdateCounter(srcPageAddr, srcIdx); assert mvccTopVer > 0 : mvccTopVer; - assert mvcCntr != CacheCoordinatorsSharedManager.COUNTER_NA; + assert mvcCntr != CacheCoordinatorsProcessor.COUNTER_NA; PageUtils.putLong(dstPageAddr, off, mvccTopVer); off += 8; http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java index a3a8416..a4eac3e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.cache.tree; import org.apache.ignite.internal.pagemem.PageUtils; -import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter; import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow; import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; @@ -62,7 +62,7 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp long mvccUpdateCntr = row.mvccCounter(); assert mvccCrdVer > 0 : mvccCrdVer; - assert mvccUpdateCntr != CacheCoordinatorsSharedManager.COUNTER_NA; + assert mvccUpdateCntr != CacheCoordinatorsProcessor.COUNTER_NA; PageUtils.putLong(pageAddr, off, mvccCrdVer); off += 8; @@ -98,7 +98,7 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp long mvccUpdateCntr = ((RowLinkIO)srcIo).getMvccUpdateCounter(srcPageAddr, srcIdx); assert mvccUpdateTopVer >=0 : mvccUpdateCntr; - assert mvccUpdateCntr != CacheCoordinatorsSharedManager.COUNTER_NA; + assert mvccUpdateCntr != CacheCoordinatorsProcessor.COUNTER_NA; PageUtils.putLong(dstPageAddr, off, mvccUpdateTopVer); off += 8; http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java index 7345106..767c996 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java @@ -21,7 +21,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter; import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow; @@ -167,7 +167,7 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> { long mvccCntr = io.getMvccUpdateCounter(pageAddr, idx); - assert row.mvccCounter() != CacheCoordinatorsSharedManager.COUNTER_NA; + assert row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA; cmp = Long.compare(row.mvccCounter(), mvccCntr); http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java index 62a07b1..fc9d15d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.cache.tree; import org.apache.ignite.internal.pagemem.PageUtils; -import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions; /** @@ -59,6 +59,6 @@ public final class CacheIdAwareDataInnerIO extends AbstractDataInnerIO { /** {@inheritDoc} */ @Override public long getMvccUpdateCounter(long pageAddr, int idx) { - return CacheCoordinatorsSharedManager.COUNTER_NA; + return CacheCoordinatorsProcessor.COUNTER_NA; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java index e22a2a0..b328924 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.cache.tree; import org.apache.ignite.internal.pagemem.PageUtils; -import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions; /** @@ -59,6 +59,6 @@ public final class CacheIdAwareDataLeafIO extends AbstractDataLeafIO { /** {@inheritDoc} */ @Override public long getMvccUpdateCounter(long pageAddr, int idx) { - return CacheCoordinatorsSharedManager.COUNTER_NA; + return CacheCoordinatorsProcessor.COUNTER_NA; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java index b334e3d..0d424b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java @@ -17,7 +17,7 @@ package org.apache.ignite.internal.processors.cache.tree; -import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -59,6 +59,6 @@ public final class DataInnerIO extends AbstractDataInnerIO { /** {@inheritDoc} */ @Override public long getMvccUpdateCounter(long pageAddr, int idx) { - return CacheCoordinatorsSharedManager.COUNTER_NA; + return CacheCoordinatorsProcessor.COUNTER_NA; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java index 28460f8..ff51bc2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java @@ -17,7 +17,7 @@ package org.apache.ignite.internal.processors.cache.tree; -import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -59,6 +59,6 @@ public final class DataLeafIO extends AbstractDataLeafIO { /** {@inheritDoc} */ @Override public long getMvccUpdateCounter(long pageAddr, int idx) { - return CacheCoordinatorsSharedManager.COUNTER_NA; + return CacheCoordinatorsProcessor.COUNTER_NA; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java index 09dc739..50f1475 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java @@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.cache.tree; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; /** @@ -46,7 +46,7 @@ public class MvccDataRow extends DataRow { super(grp, hash, link, part, rowData); assert crdVer > 0 : crdVer; - assert mvccCntr != CacheCoordinatorsSharedManager.COUNTER_NA; + assert mvccCntr != CacheCoordinatorsProcessor.COUNTER_NA; this.crdVer = crdVer; this.mvccCntr = mvccCntr; http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java index 8eb667c..5bdc495 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.cache.tree; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow; import org.apache.ignite.internal.util.typedef.internal.S; @@ -83,7 +83,7 @@ public class SearchRow implements CacheSearchRow { /** {@inheritDoc} */ @Override public long mvccCounter() { - return CacheCoordinatorsSharedManager.COUNTER_NA; + return CacheCoordinatorsProcessor.COUNTER_NA; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java index a724060..87f5882 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java @@ -285,12 +285,12 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig onDone(rdc != null ? rdc.reduce() : null); } catch (RuntimeException e) { - logError(null, "Failed to execute compound future reducer: " + this, e); + logError(logger(), "Failed to execute compound future reducer: " + this, e); onDone(e); } catch (AssertionError e) { - logError(null, "Failed to execute compound future reducer: " + this, e); + logError(logger(), "Failed to execute compound future reducer: " + this, e); onDone(e);
