http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorChangeAware.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorChangeAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorChangeAware.java new file mode 100644 index 0000000..9f5e0b8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorChangeAware.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public interface MvccCoordinatorChangeAware { + /** + * @param newCrd New coordinator. + * @return Version used by this query. + */ + @Nullable public MvccCoordinatorVersion onMvccCoordinatorChange(MvccCoordinator newCrd); +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorFuture.java new file mode 100644 index 0000000..2d4e97b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorFuture.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.util.UUID; + +/** + * + */ +public interface MvccCoordinatorFuture { + /** + * @return Coordinator node ID. + */ + public UUID coordinatorNodeId(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java index d80e43c..5b2e69e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java @@ -42,4 +42,9 @@ public interface MvccCoordinatorVersion extends Message { * @return Counter. */ public long counter(); + + /** + * @return Version without active transactions. + */ + public MvccCoordinatorVersion withoutActiveTransactions(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java index c037226..b6a4b1f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java @@ -46,7 +46,7 @@ public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage, M private int txsCnt; /** */ - private long[] txs; // TODO IGNITE-3478 (do not send on backups?) + private long[] txs; /** */ private long cleanupVer; @@ -63,7 +63,7 @@ public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage, M * @param cntr Counter. * @param cleanupVer Cleanup version. */ - public MvccCoordinatorVersionResponse(long crdVer, long cntr, long cleanupVer) { + MvccCoordinatorVersionResponse(long crdVer, long cntr, long cleanupVer) { this.crdVer = crdVer; this.cntr = cntr; this.cleanupVer = cleanupVer; @@ -154,6 +154,14 @@ public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage, M } /** {@inheritDoc} */ + @Override public MvccCoordinatorVersion withoutActiveTransactions() { + if (txsCnt > 0) + return new MvccCoordinatorVersionWithoutTxs(crdVer, cntr, cleanupVer); + + return this; + } + + /** {@inheritDoc} */ @Override public long coordinatorVersion() { return crdVer; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionWithoutTxs.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionWithoutTxs.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionWithoutTxs.java new file mode 100644 index 0000000..f4a7378 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionWithoutTxs.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class MvccCoordinatorVersionWithoutTxs implements MvccCoordinatorVersion { + /** */ + private long crdVer; + + /** */ + private long cntr; + + /** */ + private long cleanupVer; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public MvccCoordinatorVersionWithoutTxs() { + // No-op. + } + + /** + * @param crdVer Coordinator version. + * @param cntr Counter. + * @param cleanupVer Cleanup version. + */ + public MvccCoordinatorVersionWithoutTxs(long crdVer, long cntr, long cleanupVer) { + this.crdVer = crdVer; + this.cntr = cntr; + this.cleanupVer = cleanupVer; + } + + /** {@inheritDoc} */ + @Override public MvccLongList activeTransactions() { + return MvccEmptyLongList.INSTANCE; + } + + /** {@inheritDoc} */ + @Override public long coordinatorVersion() { + return crdVer; + } + + /** {@inheritDoc} */ + @Override public long cleanupVersion() { + return cleanupVer; + } + + /** {@inheritDoc} */ + @Override public long counter() { + return cntr; + } + + /** {@inheritDoc} */ + @Override public MvccCoordinatorVersion withoutActiveTransactions() { + return this; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeLong("cleanupVer", cleanupVer)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeLong("cntr", cntr)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeLong("crdVer", crdVer)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + cleanupVer = reader.readLong("cleanupVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + cntr = reader.readLong("cntr"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + crdVer = reader.readLong("crdVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(MvccCoordinatorVersionWithoutTxs.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 145; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MvccCoordinatorVersionWithoutTxs.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java index bec3301..d2fac94 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java @@ -28,6 +28,9 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; */ public class MvccCounter implements Message { /** */ + private static final long serialVersionUID = 0L; + + /** */ private long crdVer; /** */ @@ -143,7 +146,7 @@ public class MvccCounter implements Message { /** {@inheritDoc} */ @Override public short directType() { - return 141; + return 143; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccEmptyLongList.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccEmptyLongList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccEmptyLongList.java new file mode 100644 index 0000000..7963685 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccEmptyLongList.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +/** + * + */ +public class MvccEmptyLongList implements MvccLongList { + /** */ + public static MvccEmptyLongList INSTANCE = new MvccEmptyLongList(); + + /** + * + */ + private MvccEmptyLongList() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public int size() { + return 0; + } + + /** {@inheritDoc} */ + @Override public long get(int i) { + throw new IndexOutOfBoundsException(); + } + + /** {@inheritDoc} */ + @Override public boolean contains(long val) { + return false; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "MvccEmptyLongList[]"; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryAware.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryAware.java deleted file mode 100644 index d5172c6..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryAware.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.mvcc; - -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.jetbrains.annotations.Nullable; - -/** - * - */ -public interface MvccQueryAware { - /** - * @param newCrd New coordinator. - * @return Version used by this query. - */ - @Nullable public MvccCoordinatorVersion onMvccCoordinatorChange(MvccCoordinator newCrd); - - /** - * @param topVer Topology version when version was requested. - */ - public void onMvccVersionReceived(AffinityTopologyVersion topVer); - - /** - * @param e Error. - */ - public void onMvccVersionError(IgniteCheckedException e); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java index 360af4c..ad933d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java @@ -23,15 +23,17 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteInClosure; import org.jetbrains.annotations.Nullable; /** * TODO IGNITE-3478: make sure clean up is called when related future is forcibly finished, i.e. on cache stop */ -public class MvccQueryTracker { +public class MvccQueryTracker implements MvccCoordinatorChangeAware { /** */ private MvccCoordinator mvccCrd; @@ -47,14 +49,17 @@ public class MvccQueryTracker { /** */ @GridToStringExclude - private final MvccQueryAware lsnr; + private final IgniteBiInClosure<AffinityTopologyVersion, IgniteCheckedException> lsnr; /** * @param cctx Cache context. * @param canRemap {@code True} if can wait for topology changes. * @param lsnr Listener. */ - public MvccQueryTracker(GridCacheContext cctx, boolean canRemap, MvccQueryAware lsnr) { + public MvccQueryTracker(GridCacheContext cctx, + boolean canRemap, + IgniteBiInClosure<AffinityTopologyVersion, IgniteCheckedException> lsnr) + { assert cctx.mvccEnabled() : cctx.name(); this.cctx = cctx; @@ -115,13 +120,53 @@ public class MvccQueryTracker { } /** + * @param mvccInfo Mvcc update info. + * @param ctx Context. + * @param commit If {@code true} ack commit, otherwise rollback. + * @return Commit ack future. + */ + public IgniteInternalFuture<Void> onTxDone(@Nullable TxMvccInfo mvccInfo, GridCacheSharedContext ctx, boolean commit) { + MvccCoordinator mvccCrd0 = null; + MvccCoordinatorVersion mvccVer0 = null; + + synchronized (this) { + if (mvccVer != null) { + assert mvccCrd != null; + + mvccCrd0 = mvccCrd; + mvccVer0 = mvccVer; + + mvccVer = null; // Mark as finished. + } + } + + assert mvccVer0 == null || mvccInfo == null || mvccInfo.coordinatorNodeId().equals(mvccCrd0.nodeId()); + + if (mvccVer0 != null || mvccInfo != null) { + if (mvccInfo == null) { + cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0); + + return null; + } + else { + if (commit) + return ctx.coordinators().ackTxCommit(mvccInfo.coordinatorNodeId(), mvccInfo.version(), mvccVer0); + else + ctx.coordinators().ackTxRollback(mvccInfo.coordinatorNodeId(), mvccInfo.version(), mvccVer0); + } + } + + return null; + } + + /** * @param topVer Topology version. */ public void requestVersion(final AffinityTopologyVersion topVer) { MvccCoordinator mvccCrd0 = cctx.affinity().mvccCoordinator(topVer); if (mvccCrd0 == null) { - lsnr.onMvccVersionError(new IgniteCheckedException("Mvcc coordinator is not assigned: " + topVer)); + lsnr.apply(null, CacheCoordinatorsProcessor.noCoordinatorError(topVer)); return; } @@ -136,7 +181,7 @@ public class MvccQueryTracker { assert cctx.topology().topologyVersionFuture().initialVersion().compareTo(topVer) > 0; if (!canRemap) { - lsnr.onMvccVersionError(new ClusterTopologyCheckedException("Failed to request mvcc version, coordinator changed.")); + lsnr.apply(null, new ClusterTopologyCheckedException("Failed to request mvcc version, coordinator changed.")); return; } @@ -147,6 +192,7 @@ public class MvccQueryTracker { } } + // TODO IGNITE-3478: get rid of future creation in 'requestQueryCounter'. IgniteInternalFuture<MvccCoordinatorVersion> cntrFut = cctx.shared().coordinators().requestQueryCounter(mvccCrd0); @@ -172,7 +218,7 @@ public class MvccQueryTracker { } if (!needRemap) { - lsnr.onMvccVersionReceived(topVer); + lsnr.apply(topVer, null); return; } @@ -184,7 +230,7 @@ public class MvccQueryTracker { log.debug("Mvcc coordinator failed, need remap: " + e); } catch (IgniteCheckedException e) { - lsnr.onMvccVersionError(e); + lsnr.apply(null, e); return; } @@ -193,7 +239,7 @@ public class MvccQueryTracker { if (canRemap) waitNextTopology(topVer); else { - lsnr.onMvccVersionError(new ClusterTopologyCheckedException("Failed to " + + lsnr.apply(null, new ClusterTopologyCheckedException("Failed to " + "request mvcc version, coordinator failed.")); } } @@ -218,7 +264,7 @@ public class MvccQueryTracker { requestVersion(fut.get()); } catch (IgniteCheckedException e) { - lsnr.onMvccVersionError(e); + lsnr.apply(null, e); } } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java index 700b27d..5c56f40 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java @@ -51,11 +51,11 @@ class PreviousCoordinatorQueries { private boolean initDone; /** - * @param srvNodesQueries Active queries started on server nodes. + * @param nodeQueries Active queries map. * @param discoCache Discovery data. * @param mgr Discovery manager. */ - void init(Map<UUID, Map<MvccCounter, Integer>> srvNodesQueries, DiscoCache discoCache, GridDiscoveryManager mgr) { + void init(Map<UUID, Map<MvccCounter, Integer>> nodeQueries, DiscoCache discoCache, GridDiscoveryManager mgr) { synchronized (this) { assert !initDone; assert waitNodes == null; @@ -63,14 +63,16 @@ class PreviousCoordinatorQueries { waitNodes = new HashSet<>(); for (ClusterNode node : discoCache.allNodes()) { - if (CU.clientNode(node) && mgr.alive(node) && !F.contains(rcvd, node.id())) + if ((nodeQueries == null || !nodeQueries.containsKey(node.id())) && + mgr.alive(node) && + !F.contains(rcvd, node.id())) waitNodes.add(node.id()); } initDone = waitNodes.isEmpty(); - if (srvNodesQueries != null) { - for (Map.Entry<UUID, Map<MvccCounter, Integer>> e : srvNodesQueries.entrySet()) + if (nodeQueries != null) { + for (Map.Entry<UUID, Map<MvccCounter, Integer>> e : nodeQueries.entrySet()) addAwaitedActiveQueries(e.getKey(), e.getValue()); } @@ -123,7 +125,7 @@ class PreviousCoordinatorQueries { * @param nodeId Node ID. * @param nodeQueries Active queries started on node. */ - void processClientActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, Integer> nodeQueries) { + void addNodeActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, Integer> nodeQueries) { synchronized (this) { if (initDone) return; @@ -158,23 +160,27 @@ class PreviousCoordinatorQueries { /** * @param nodeId Node ID. - * @param msg Message. + * @param crdVer Coordinator version. + * @param cntr Counter. */ - void onQueryDone(UUID nodeId, NewCoordinatorQueryAckRequest msg) { + void onQueryDone(UUID nodeId, long crdVer, long cntr) { + assert crdVer != 0; + assert cntr != CacheCoordinatorsProcessor.COUNTER_NA; + synchronized (this) { - MvccCounter cntr = new MvccCounter(msg.coordinatorVersion(), msg.counter()); + MvccCounter mvccCntr = new MvccCounter(crdVer, cntr); Map<MvccCounter, Integer> nodeQueries = activeQueries.get(nodeId); if (nodeQueries == null) activeQueries.put(nodeId, nodeQueries = new HashMap<>()); - Integer qryCnt = nodeQueries.get(cntr); + Integer qryCnt = nodeQueries.get(mvccCntr); int newQryCnt = (qryCnt != null ? qryCnt : 0) - 1; if (newQryCnt == 0) { - nodeQueries.remove(cntr); + nodeQueries.remove(mvccCntr); if (nodeQueries.isEmpty()) { activeQueries.remove(nodeId); @@ -184,7 +190,7 @@ class PreviousCoordinatorQueries { } } else - nodeQueries.put(cntr, newQryCnt); + nodeQueries.put(mvccCntr, newQryCnt); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java index 428d707..96a9864 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java @@ -29,6 +29,9 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; */ public class TxMvccInfo implements Message { /** */ + private static final long serialVersionUID = 0L; + + /** */ private UUID crd; /** */ @@ -42,8 +45,8 @@ public class TxMvccInfo implements Message { } /** - * @param crd - * @param mvccVer + * @param crd Coordinator node ID. + * @param mvccVer Mvcc version. */ public TxMvccInfo(UUID crd, MvccCoordinatorVersion mvccVer) { assert crd != null; @@ -53,10 +56,28 @@ public class TxMvccInfo implements Message { this.mvccVer = mvccVer; } - public UUID coordinator() { + /** + * @return Instance with version without active transactions. + */ + public TxMvccInfo withoutActiveTransactions() { + MvccCoordinatorVersion mvccVer0 = mvccVer.withoutActiveTransactions(); + + if (mvccVer0 == mvccVer) + return this; + + return new TxMvccInfo(crd, mvccVer0); + } + + /** + * @return Coordinator node ID. + */ + public UUID coordinatorNodeId() { return crd; } + /** + * @return Mvcc version. + */ public MvccCoordinatorVersion version() { return mvccVer; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index e5a9736..5fc38ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -1261,12 +1261,13 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple KeyCacheObject key, @Nullable CacheObject val, GridCacheVersion ver, + long expireTime, MvccCoordinatorVersion mvccVer) throws IgniteCheckedException { CacheDataStore delegate = init0(false); - return delegate.mvccInitialValue(cctx, key, val, ver, mvccVer); + return delegate.mvccInitialValue(cctx, key, val, ver, expireTime, mvccVer); } /** {@inheritDoc} */ @@ -1276,10 +1277,11 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple KeyCacheObject key, CacheObject val, GridCacheVersion ver, + long expireTime, MvccCoordinatorVersion mvccVer) throws IgniteCheckedException { CacheDataStore delegate = init0(false); - return delegate.mvccUpdate(cctx, primary, key, val, ver, mvccVer); + return delegate.mvccUpdate(cctx, primary, key, val, ver, expireTime, mvccVer); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index b0cfa2d..5db0d49 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -59,6 +59,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo; import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry; @@ -381,6 +382,15 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement return mvccInfo; } + /** + * @return Mvcc version for update operation, should be always initialized if mvcc is enabled. + */ + @Nullable protected final MvccCoordinatorVersion mvccVersionForUpdate() { + assert !txState().mvccEnabled(cctx) || mvccInfo != null : "Mvcc is not initialized: " + this; + + return mvccInfo != null ? mvccInfo.version() : null; + } + /** {@inheritDoc} */ @Override public void mvccInfo(TxMvccInfo mvccInfo) { this.mvccInfo = mvccInfo; http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index d8f911c..4321ebf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -520,8 +520,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig try { cctx.tm().txContext(this); - assert !txState.mvccEnabled(cctx) || mvccInfo != null; - AffinityTopologyVersion topVer = topologyVersion(); /* @@ -700,7 +698,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig resolveTaskName(), dhtVer, null, - mvccInfo != null ? mvccInfo.version() : null); + mvccVersionForUpdate()); if (updRes.success()) { txEntry.updateCounter(updRes.updatePartitionCounter()); @@ -733,7 +731,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig resolveTaskName(), dhtVer, null, - mvccInfo != null ? mvccInfo.version() : null); + mvccVersionForUpdate()); } } else if (op == DELETE) { @@ -755,7 +753,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig resolveTaskName(), dhtVer, null, - mvccInfo != null ? mvccInfo.version() : null); + mvccVersionForUpdate()); if (updRes.success()) { txEntry.updateCounter(updRes.updatePartitionCounter()); @@ -784,7 +782,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig resolveTaskName(), dhtVer, null, - mvccInfo != null ? mvccInfo.version() : null); + mvccVersionForUpdate()); } } else if (op == RELOAD) { http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java index fc82cbb..31aa2ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java @@ -81,6 +81,9 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i long mvccTopVer = getMvccCoordinatorVersion(pageAddr, idx); long mvccCntr = getMvccCounter(pageAddr, idx); + assert unmaskCoordinatorVersion(mvccTopVer) > 0 : mvccTopVer; + assert mvccCntr != COUNTER_NA; + return ((CacheDataTree)tree).rowStore().mvccRow(cacheId, hash, link, @@ -122,15 +125,15 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i if (storeMvccVersion()) { long mvccTopVer = rowIo.getMvccCoordinatorVersion(srcPageAddr, srcIdx); - long mvcCntr = rowIo.getMvccCounter(srcPageAddr, srcIdx); + long mvccCntr = rowIo.getMvccCounter(srcPageAddr, srcIdx); - assert mvccTopVer > 0 : mvccTopVer; - assert mvcCntr != COUNTER_NA; + assert unmaskCoordinatorVersion(mvccTopVer) > 0 : mvccTopVer; + assert mvccCntr != COUNTER_NA; PageUtils.putLong(dstPageAddr, off, mvccTopVer); off += 8; - PageUtils.putLong(dstPageAddr, off, mvcCntr); + PageUtils.putLong(dstPageAddr, off, mvccCntr); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java index c956d22..47d8a6f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java @@ -99,7 +99,7 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp long mvccUpdateTopVer = ((RowLinkIO)srcIo).getMvccCoordinatorVersion(srcPageAddr, srcIdx); long mvccUpdateCntr = ((RowLinkIO)srcIo).getMvccCounter(srcPageAddr, srcIdx); - assert mvccUpdateTopVer >=0 : mvccUpdateCntr; + assert unmaskCoordinatorVersion(mvccUpdateTopVer) > 0 : mvccUpdateCntr; assert mvccUpdateCntr != COUNTER_NA; PageUtils.putLong(dstPageAddr, off, mvccUpdateTopVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java index 6309153..e8861bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java @@ -157,7 +157,7 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> { cmp = compareKeys(row.key(), link); if (cmp != 0 || !grp.mvccEnabled()) - return 0; + return cmp; long mvccCrdVer = io.getMvccCoordinatorVersion(pageAddr, idx); http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java index af11a9d..2785186 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java @@ -37,9 +37,10 @@ public class MvccRemoveRow extends MvccUpdateRow { public MvccRemoveRow( KeyCacheObject key, MvccCoordinatorVersion mvccVer, + boolean needOld, int part, int cacheId) { - super(key, null, null, mvccVer, part, cacheId); + super(key, null, null, 0L, mvccVer, needOld, part, cacheId); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java index 137ca28..fb2a6cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter; import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow; import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO; @@ -34,6 +35,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion; +import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue; /** * @@ -54,6 +56,12 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C /** */ private final MvccCoordinatorVersion mvccVer; + /** */ + private final boolean needOld; + + /** */ + private CacheDataRow oldRow; + /** * @param key Key. * @param val Value. @@ -66,12 +74,22 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C KeyCacheObject key, CacheObject val, GridCacheVersion ver, + long expireTime, MvccCoordinatorVersion mvccVer, + boolean needOld, int part, int cacheId) { - super(key, val, ver, part, 0L, cacheId); + super(key, val, ver, part, expireTime, cacheId); this.mvccVer = mvccVer; + this.needOld = needOld; + } + + /** + * @return Old row. + */ + public CacheDataRow oldRow() { + return oldRow; } /** @@ -110,7 +128,7 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C if (cmp == 0) cmp = Long.compare(mvccVer.counter(), rowCntr); - // Can be equals if backup rebalanced value updated on primary. + // Can be equals if execute update on backup and backup already rebalanced value updated on primary. assert cmp >= 0 : "[updCrd=" + unmaskedCoordinatorVersion() + ", updCntr=" + mvccVer.counter() + ", rowCrd=" + rowCrdVer + @@ -148,9 +166,18 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C if (cmp == 0) res = UpdateResult.VERSION_FOUND; - else - res = CacheCoordinatorsProcessor.versionForRemovedValue(rowCrdVerMasked) ? + else { + if (versionForRemovedValue(rowCrdVerMasked)) + res = UpdateResult.PREV_NULL; + else { + res = UpdateResult.PREV_NOT_NULL; + + if (needOld) + oldRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY); + } + res = versionForRemovedValue(rowCrdVerMasked) ? UpdateResult.PREV_NULL : UpdateResult.PREV_NOT_NULL; + } } // Suppose transactions on previous coordinator versions are done. http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index 30145ab..e6300a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -88,7 +88,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; -import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionResponse; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionWithoutTxs; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.dr.GridDrType; @@ -134,7 +134,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** Version which is less then any version generated on coordinator. */ private static final MvccCoordinatorVersion ISOLATED_STREAMER_MVCC_VER = - new MvccCoordinatorVersionResponse(1L, CacheCoordinatorsProcessor.START_VER, 0L); + new MvccCoordinatorVersionWithoutTxs(1L, CacheCoordinatorsProcessor.START_VER, 0L); /** Cache receiver. */ private StreamReceiver<K, V> rcvr = ISOLATED_UPDATER; http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java index 859010e..58da451 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java @@ -187,8 +187,16 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { * @throws InterruptedException If interrupted. */ public void waitForBlocked() throws InterruptedException { + waitForBlocked(1); + } + + /** + * @param size Number of messages to wait for. + * @throws InterruptedException If interrupted. + */ + public void waitForBlocked(int size) throws InterruptedException { synchronized (this) { - while (blockedMsgs.isEmpty()) + while (blockedMsgs.size() < size) wait(); } }
