http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java new file mode 100644 index 0000000..f46d1e0 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.lang.IgniteInClosure; +import org.jetbrains.annotations.NotNull; + +import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.noCoordinatorError; + +/** + * Tracker used for an optimistic tx and not-in-tx queries. + */ +@SuppressWarnings("unchecked") +public class MvccQueryTrackerImpl implements MvccQueryTracker { + /** */ + @GridToStringExclude + private final GridCacheContext cctx; + + /** */ + @GridToStringExclude + private final IgniteLogger log; + + /** */ + @GridToStringExclude + private long crdVer; + + /** */ + private final long id; + + /** */ + private MvccSnapshot snapshot; + + /** */ + private volatile AffinityTopologyVersion topVer; + + /** */ + private final boolean canRemap; + + /** + * @param cctx Cache context. + */ + public MvccQueryTrackerImpl(GridCacheContext cctx) { + this(cctx, true); + } + + /** + * @param cctx Cache context. + * @param canRemap {@code True} if tracker can remap on coordinator fail. + */ + public MvccQueryTrackerImpl(GridCacheContext cctx, boolean canRemap) { + this.cctx = cctx; + this.id = ID_CNTR.incrementAndGet(); + this.canRemap = canRemap; + + log = cctx.logger(getClass()); + } + + /** {@inheritDoc} */ + @Override public long id() { + return id; + } + + /** {@inheritDoc} */ + @Override public synchronized MvccSnapshot snapshot() { + return snapshot; + } + + /** {@inheritDoc} */ + @Override public GridCacheContext context() { + return cctx; + } + + /** {@inheritDoc} */ + @Override public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<MvccSnapshot> requestSnapshot() { + MvccSnapshot snapshot; MvccSnapshotFuture fut; + + if ((snapshot = snapshot()) != null) + return new GridFinishedFuture<>(snapshot); + + requestSnapshot0(cctx.shared().exchange().readyAffinityVersion(), fut = new MvccSnapshotFuture()); + + return fut; + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<MvccSnapshot> requestSnapshot(@NotNull AffinityTopologyVersion topVer) { + MvccSnapshot snapshot; MvccSnapshotFuture fut; + + if ((snapshot = snapshot()) != null) + return new GridFinishedFuture<>(snapshot); + + requestSnapshot0(topVer, fut = new MvccSnapshotFuture()); + + return fut; + } + + /** {@inheritDoc} */ + @Override public void requestSnapshot(@NotNull AffinityTopologyVersion topVer, @NotNull MvccSnapshotResponseListener lsnr) { + MvccSnapshot snapshot = snapshot(); + + if (snapshot != null) + lsnr.onResponse(snapshot); + else + requestSnapshot0(topVer, lsnr); + } + + /** {@inheritDoc} */ + @Override public void onDone() { + MvccProcessor prc = cctx.shared().coordinators(); + + MvccSnapshot snapshot = snapshot(); + + if (snapshot != null) { + prc.removeQueryTracker(id); + + prc.ackQueryDone(snapshot, id); + } + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<Void> onDone(@NotNull GridNearTxLocal tx, boolean commit) { + MvccSnapshot snapshot = snapshot(), txSnapshot = tx.mvccSnapshot(); + + if (snapshot == null && txSnapshot == null) + return commit ? new GridFinishedFuture<>() : null; + + MvccProcessor prc = cctx.shared().coordinators(); + + if (snapshot != null) + prc.removeQueryTracker(id); + + if (txSnapshot == null) + prc.ackQueryDone(snapshot, id); + else if (commit) + return prc.ackTxCommit(txSnapshot, snapshot, id); + else + prc.ackTxRollback(txSnapshot, snapshot, id); + + return null; + } + + /** {@inheritDoc} */ + @Override public synchronized long onMvccCoordinatorChange(MvccCoordinator newCrd) { + if (snapshot != null) { + assert crdVer != 0 : this; + + if (crdVer != newCrd.coordinatorVersion()) { + crdVer = newCrd.coordinatorVersion(); + + return id; + } + else + return MVCC_TRACKER_ID_NA; + } + else if (crdVer != 0) + crdVer = 0; // Mark for remap. + + return MVCC_TRACKER_ID_NA; + } + + /** */ + private void requestSnapshot0(AffinityTopologyVersion topVer, MvccSnapshotResponseListener lsnr) { + if (checkTopology(topVer, lsnr = decorate(lsnr))) { + try { + MvccSnapshot snapshot = cctx.shared().coordinators().tryRequestSnapshotLocal(); + + if (snapshot == null) + cctx.shared().coordinators().requestSnapshotAsync(lsnr); + else + lsnr.onResponse(snapshot); + } + catch (ClusterTopologyCheckedException e) { + lsnr.onError(e); + } + } + } + + /** */ + private MvccSnapshotResponseListener decorate(MvccSnapshotResponseListener lsnr) { + assert lsnr != null; + + if (lsnr.getClass() == ListenerDecorator.class) + return lsnr; + + return new ListenerDecorator(lsnr); + } + + /** + * Validates if mvcc snapshot could be requested on the given topology. + * + * @return {@code True} if topology is valid. + */ + private boolean checkTopology(AffinityTopologyVersion topVer, MvccSnapshotResponseListener lsnr) { + MvccCoordinator crd = cctx.affinity().mvccCoordinator(topVer); + + if (crd == null) { + lsnr.onError(noCoordinatorError(topVer)); + + return false; + } + + this.topVer = topVer; + + synchronized (this) { + crdVer = crd.coordinatorVersion(); + } + + MvccCoordinator curCrd = cctx.topology().mvccCoordinator(); + + if (!crd.equals(curCrd)) { + assert cctx.topology().topologyVersionFuture().initialVersion().compareTo(topVer) > 0; + + tryRemap(lsnr); + + return false; + } + + return true; + } + + /** */ + private void tryRemap(MvccSnapshotResponseListener lsnr) { + if (!canRemap) { + lsnr.onError(new ClusterTopologyCheckedException("Failed to request mvcc version, coordinator failed.")); + + return; + } + + IgniteInternalFuture<AffinityTopologyVersion> waitFut = + cctx.shared().exchange().affinityReadyFuture(topVer.nextMinorVersion()); + + if (waitFut == null) + requestSnapshot(cctx.shared().exchange().readyAffinityVersion(), lsnr); + else { + waitFut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { + try { + requestSnapshot(fut.get(), lsnr); + } + catch (IgniteCheckedException e) { + lsnr.onError(e); + } + } + }); + } + } + + /** + * @param res Response. + * @param lsnr Response listener. + * @return {@code false} if need to remap. + */ + private boolean onResponse0(@NotNull MvccSnapshot res, MvccSnapshotResponseListener lsnr) { + boolean needRemap = false; + + synchronized (this) { + assert snapshot() == null : "[this=" + this + ", rcvdVer=" + res + "]"; + + if (crdVer != 0) { + this.snapshot = res; + } + else + needRemap = true; + } + + if (needRemap) { // Coordinator failed or reassigned, need remap. + tryRemap(lsnr); + + return false; + } + + cctx.shared().coordinators().addQueryTracker(this); + + return true; + } + + /** + * @param e Exception. + * @param lsnr Response listener. + * @return {@code false} if need to remap. + */ + private boolean onError0(IgniteCheckedException e, MvccSnapshotResponseListener lsnr) { + if (e instanceof ClusterTopologyCheckedException && canRemap) { + if (e instanceof ClusterTopologyServerNotFoundException) + return true; // No Mvcc coordinator assigned + + if (log.isDebugEnabled()) + log.debug("Mvcc coordinator failed, need remap: " + e); + + tryRemap(lsnr); + + return false; + } + + return true; + } + + /** */ + private final class ListenerDecorator implements MvccSnapshotResponseListener { + /** */ + private final MvccSnapshotResponseListener lsnr; + + /** */ + private ListenerDecorator(MvccSnapshotResponseListener lsnr) { + this.lsnr = lsnr; + } + + @Override public void onResponse(MvccSnapshot res) { + if (onResponse0(res, this)) + lsnr.onResponse(res); + } + + @Override public void onError(IgniteCheckedException e) { + if (onError0(e, this)) + lsnr.onError(e); + } + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccSnapshot.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccSnapshot.java new file mode 100644 index 0000000..5ed743a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccSnapshot.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import org.apache.ignite.plugin.extensions.communication.Message; + +/** + * MVCC snapshot which holds the following information: + * - Current MVCC version which should be used for visibility checks + * - List of active transactions which should not be visible to current transaction + * - Cleanup version which is used to help vacuum process. + */ +public interface MvccSnapshot extends MvccVersion, Message { + /** + * @return Active transactions. + */ + public MvccLongList activeTransactions(); + + /** + * @return Cleanup version (all smaller versions are safe to remove). + */ + public long cleanupVersion(); + + /** + * @return Version without active transactions. + */ + public MvccSnapshot withoutActiveTransactions(); + + /** + * Increments operation counter. + */ + public void incrementOperationCounter(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccSnapshotFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccSnapshotFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccSnapshotFuture.java new file mode 100644 index 0000000..934ff2f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccSnapshotFuture.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * + */ +public class MvccSnapshotFuture extends MvccFuture<MvccSnapshot> implements MvccSnapshotResponseListener { + /** {@inheritDoc} */ + @Override public void onResponse(MvccSnapshot res) { + assert res != null; + + onDone(res); + } + + /** {@inheritDoc} */ + @Override public void onError(IgniteCheckedException err) { + onDone(err); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MvccSnapshotFuture.class, this, super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccSnapshotResponseListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccSnapshotResponseListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccSnapshotResponseListener.java new file mode 100644 index 0000000..e0bf448 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccSnapshotResponseListener.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import org.apache.ignite.IgniteCheckedException; + +/** + * + */ +public interface MvccSnapshotResponseListener { + /** + * @param res Version. + */ + public void onResponse(MvccSnapshot res); + + /** + * @param e Error. + */ + public void onError(IgniteCheckedException e); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccSnapshotWithoutTxs.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccSnapshotWithoutTxs.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccSnapshotWithoutTxs.java new file mode 100644 index 0000000..5be6317 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccSnapshotWithoutTxs.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class MvccSnapshotWithoutTxs implements MvccSnapshot { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long crdVer; + + /** */ + private long cntr; + + /** */ + private long cleanupVer; + + /** */ + private int opCntr; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public MvccSnapshotWithoutTxs() { + // No-op. + } + + /** + * @param crdVer Coordinator version. + * @param cntr Counter. + * @param cleanupVer Cleanup version. + */ + public MvccSnapshotWithoutTxs(long crdVer, long cntr, int opCntr, long cleanupVer) { + this.crdVer = crdVer; + this.cntr = cntr; + this.cleanupVer = cleanupVer; + this.opCntr = opCntr; + } + + /** {@inheritDoc} */ + @Override public MvccLongList activeTransactions() { + return MvccEmptyLongList.INSTANCE; + } + + /** {@inheritDoc} */ + @Override public long coordinatorVersion() { + return crdVer; + } + + /** {@inheritDoc} */ + @Override public long cleanupVersion() { + return cleanupVer; + } + + /** {@inheritDoc} */ + @Override public long counter() { + return cntr; + } + + /** {@inheritDoc} */ + @Override public int operationCounter() { + return opCntr; + } + + /** {@inheritDoc} */ + @Override public void incrementOperationCounter() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public MvccSnapshot withoutActiveTransactions() { + return this; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeLong("cleanupVer", cleanupVer)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeLong("cntr", cntr)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeLong("crdVer", crdVer)) + return false; + + writer.incrementState(); + + case 3: + if (!writer.writeInt("opCntr", opCntr)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + cleanupVer = reader.readLong("cleanupVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + cntr = reader.readLong("cntr"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + crdVer = reader.readLong("crdVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 3: + opCntr = reader.readInt("opCntr"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(MvccSnapshotWithoutTxs.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 150; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 4; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MvccSnapshotWithoutTxs.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUpdateVersionAware.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUpdateVersionAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUpdateVersionAware.java new file mode 100644 index 0000000..17804c4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUpdateVersionAware.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +/** + * Interface for objects aware theirs mvcc update version. + */ +public interface MvccUpdateVersionAware { + /** + * @return New mvcc coordinator version. + */ + public long newMvccCoordinatorVersion(); + + /** + * @return New mvcc counter. + */ + public long newMvccCounter(); + + /** + * @return New mvcc operation counter. + */ + public int newMvccOperationCounter(); + + /** + * @return New Tx state. + */ + public byte newMvccTxState(); + + /** + * Copies new MVCC version + * @param other Object to copy version from. + */ + public default void newMvccVersion(MvccUpdateVersionAware other) { + newMvccVersion(other.newMvccCoordinatorVersion(), other.newMvccCounter(), other.newMvccOperationCounter()); + } + + /** + * Sets new MVCC version + * @param ver MVCC version. + */ + public default void newMvccVersion(MvccVersion ver) { + newMvccVersion(ver.coordinatorVersion(), ver.counter(), ver.operationCounter()); + } + + /** + * Sets new mvcc version. + * @param crd New mvcc coordinator version. + * @param cntr New mvcc counter. + * @param opCntr New mvcc operation counter. + */ + public default void newMvccVersion(long crd, long cntr, int opCntr) { + throw new UnsupportedOperationException(); + } + + /** + * @return New mvcc version. + */ + public default MvccVersion newMvccVersion() { + return new MvccVersionImpl(newMvccCoordinatorVersion(), newMvccCounter(), newMvccOperationCounter()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java new file mode 100644 index 0000000..33f457d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java @@ -0,0 +1,882 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.TransactionConfiguration; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; +import org.apache.ignite.internal.pagemem.PageMemory; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; +import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO; +import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.query.IgniteSQLException; +import org.apache.ignite.internal.transactions.IgniteTxMvccVersionCheckedException; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.transactions.TransactionState; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.internal.pagemem.PageIdUtils.itemId; +import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId; +import static org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO.MVCC_INFO_SIZE; +import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.MVCC_HINTS_BIT_OFF; +import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.MVCC_HINTS_MASK; +import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.TRANSACTION_COMPLETED; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * Utils for MVCC. + */ +public class MvccUtils { + /** */ + public static final long MVCC_CRD_COUNTER_NA = 0L; + /** */ + public static final long MVCC_CRD_START_CNTR = 1L; + /** */ + public static final long MVCC_COUNTER_NA = 0L; + /** */ + public static final long MVCC_INITIAL_CNTR = 1L; + /** */ + public static final long MVCC_START_CNTR = 3L; + /** */ + public static final int MVCC_OP_COUNTER_NA = 0; + /** */ + public static final int MVCC_START_OP_CNTR = 1; + /** */ + public static final int MVCC_READ_OP_CNTR = ~MVCC_HINTS_MASK; + + /** */ + public static final int MVCC_INVISIBLE = 0; + /** */ + public static final int MVCC_VISIBLE_REMOVED = 1; + /** */ + public static final int MVCC_VISIBLE = 2; + + /** */ + public static final MvccVersion INITIAL_VERSION = + mvccVersion(MVCC_CRD_START_CNTR, MVCC_INITIAL_CNTR, MVCC_START_OP_CNTR); + + /** */ + public static final MvccVersion MVCC_VERSION_NA = + mvccVersion(MVCC_CRD_COUNTER_NA, MVCC_COUNTER_NA, MVCC_OP_COUNTER_NA); + + /** */ + private static final MvccClosure<Integer> getVisibleState = new GetVisibleState(); + + /** */ + private static final MvccClosure<Boolean> isVisible = new IsVisible(); + + /** */ + private static final MvccClosure<MvccVersion> getNewVer = new GetNewVersion(); + + /** + * + */ + private MvccUtils(){ + } + + /** + * @param ctx Kernal context. + * @return Newly created Mvcc processor. + */ + public static MvccProcessor createProcessor(GridKernalContext ctx) { + return mvccEnabled(ctx) ? new MvccProcessorImpl(ctx) : new NoOpMvccProcessor(ctx); + } + + /** + * @param cctx Cache context. + * @param mvccCrd Mvcc coordinator version. + * @param mvccCntr Mvcc counter. + * @param snapshot Snapshot. + * @return {@code True} if transaction is active. + * @see TxState + * @throws IgniteCheckedException If failed. + */ + public static boolean isActive(GridCacheContext cctx, long mvccCrd, long mvccCntr, MvccSnapshot snapshot) + throws IgniteCheckedException { + if (isVisible(cctx, snapshot, mvccCrd, mvccCntr, MVCC_OP_COUNTER_NA, false)) + return false; + + byte state = state(cctx, mvccCrd, mvccCntr, 0); + + return state != TxState.COMMITTED && state != TxState.ABORTED + || cctx.kernalContext().coordinators().hasLocalTransaction(mvccCrd, mvccCntr); + } + + /** + * @param cctx Cache context. + * @param mvccCrd Mvcc coordinator version. + * @param mvccCntr Mvcc counter. + * @param mvccOpCntr Mvcc operation counter. + * @return TxState + * @see TxState + * @throws IgniteCheckedException If failed. + */ + public static byte state(GridCacheContext cctx, long mvccCrd, long mvccCntr, int mvccOpCntr) throws IgniteCheckedException { + return state(cctx.kernalContext().coordinators(), mvccCrd, mvccCntr, mvccOpCntr); + } + + /** + * @param grp Cache group context. + * @param mvccCrd Mvcc coordinator version. + * @param mvccCntr Mvcc counter. + * @param mvccOpCntr Mvcc operation counter. + * @return TxState + * @see TxState + * @throws IgniteCheckedException If failed. + */ + public static byte state(CacheGroupContext grp, long mvccCrd, long mvccCntr, int mvccOpCntr) throws IgniteCheckedException { + return state(grp.shared().coordinators(), mvccCrd, mvccCntr, mvccOpCntr); + } + + /** + * @param proc Mvcc processor. + * @param mvccCrd Mvcc coordinator version. + * @param mvccCntr Mvcc counter. + * @return TxState + * @see TxState + * @throws IgniteCheckedException If failed. + */ + private static byte state(MvccProcessor proc, long mvccCrd, long mvccCntr, int mvccOpCntr) throws IgniteCheckedException { + if (compare(INITIAL_VERSION, mvccCrd, mvccCntr, mvccOpCntr) == 0) + return TxState.COMMITTED; // Initial version is always committed; + + if ((mvccOpCntr & MVCC_HINTS_MASK) != 0) + return (byte)(mvccOpCntr >>> MVCC_HINTS_BIT_OFF); + + return proc.state(mvccCrd, mvccCntr); + } + + /** + * Checks if version is visible from the given snapshot. + * + * @param cctx Cache context. + * @param snapshot Snapshot. + * @param mvccCrd Mvcc coordinator. + * @param mvccCntr Mvcc counter. + * @param opCntr Operation counter. + * @return {@code True} if visible. + * @throws IgniteCheckedException If failed. + */ + public static boolean isVisible(GridCacheContext cctx, MvccSnapshot snapshot, long mvccCrd, long mvccCntr, + int opCntr) throws IgniteCheckedException { + return isVisible(cctx, snapshot, mvccCrd, mvccCntr, opCntr, true); + } + + /** + * Checks if version is visible from the given snapshot. + * + * @param cctx Cache context. + * @param snapshot Snapshot. + * @param mvccCrd Mvcc coordinator. + * @param mvccCntr Mvcc counter. + * @param opCntr Operation counter. + * @param useTxLog {@code True} if TxLog should be used. + * @return {@code True} if visible. + * @throws IgniteCheckedException If failed. + */ + public static boolean isVisible(GridCacheContext cctx, MvccSnapshot snapshot, long mvccCrd, long mvccCntr, + int opCntr, boolean useTxLog) throws IgniteCheckedException { + if (mvccCrd == MVCC_CRD_COUNTER_NA) { + assert mvccCntr == MVCC_COUNTER_NA && opCntr == MVCC_OP_COUNTER_NA + : "rowVer=" + mvccVersion(mvccCrd, mvccCntr, opCntr) + ", snapshot=" + snapshot; + + return false; // Unassigned version is always invisible + } + + if (compare(INITIAL_VERSION, mvccCrd, mvccCntr, opCntr) == 0) + return true; // Initial version is always visible + + long snapshotCrd = snapshot.coordinatorVersion(); + + long snapshotCntr = snapshot.counter(); + int snapshotOpCntr = snapshot.operationCounter(); + + if (mvccCrd > snapshotCrd) + return false; // Rows in the future are never visible. + + if (mvccCrd < snapshotCrd) + // Don't check the row with TxLog if the row is expected to be committed. + return !useTxLog || isCommitted(cctx, mvccCrd, mvccCntr, opCntr); + + if (mvccCntr > snapshotCntr) // we don't see future updates + return false; + + if (mvccCntr == snapshotCntr) { + assert opCntr <= snapshotOpCntr : "rowVer=" + mvccVersion(mvccCrd, mvccCntr, opCntr) + ", snapshot=" + snapshot; + + return opCntr < snapshotOpCntr; // we don't see own pending updates + } + + if (snapshot.activeTransactions().contains(mvccCntr)) // we don't see of other transactions' pending updates + return false; + + if (!useTxLog) + return true; // The checking row is expected to be committed. + + byte state = state(cctx, mvccCrd, mvccCntr, opCntr); + + if (state != TxState.COMMITTED && state != TxState.ABORTED) + throw unexpectedStateException(cctx, state, mvccCrd, mvccCntr, opCntr, snapshot); + + return state == TxState.COMMITTED; + } + + /** + * + * @param grp Cache group context. + * @param state State. + * @param crd Mvcc coordinator counter. + * @param cntr Mvcc counter. + * @param opCntr Mvcc operation counter. + * @return State exception. + */ + public static IgniteTxMvccVersionCheckedException unexpectedStateException( + CacheGroupContext grp, byte state, long crd, long cntr, + int opCntr) { + return unexpectedStateException(grp.shared().kernalContext(), state, crd, cntr, opCntr, null); + } + + /** + * + * @param cctx Cache context. + * @param state State. + * @param crd Mvcc coordinator counter. + * @param cntr Mvcc counter. + * @param opCntr Mvcc operation counter. + * @param snapshot Mvcc snapshot + * @return State exception. + */ + public static IgniteTxMvccVersionCheckedException unexpectedStateException( + GridCacheContext cctx, byte state, long crd, long cntr, + int opCntr, MvccSnapshot snapshot) { + return unexpectedStateException(cctx.kernalContext(), state, crd, cntr, opCntr, snapshot); + } + + /** */ + private static IgniteTxMvccVersionCheckedException unexpectedStateException(GridKernalContext ctx, byte state, long crd, long cntr, + int opCntr, MvccSnapshot snapshot) { + String msg = "Unexpected state: [state=" + state + ", rowVer=" + crd + ":" + cntr + ":" + opCntr; + + if (snapshot != null) + msg += ", txVer=" + snapshot.coordinatorVersion() + ":" + snapshot.counter() + ":" + snapshot.operationCounter(); + + msg += ", localNodeId=" + ctx.localNodeId() + "]"; + + return new IgniteTxMvccVersionCheckedException(msg); + } + + /** + * Checks visibility of the given row versions from the given snapshot. + * + * @param cctx Context. + * @param snapshot Snapshot. + * @param crd Mvcc coordinator counter. + * @param cntr Mvcc counter. + * @param opCntr Mvcc operation counter. + * @param link Link to data row (new version is located there). + * @return Visibility status. + * @throws IgniteCheckedException If failed. + */ + public static boolean isVisible(GridCacheContext cctx, MvccSnapshot snapshot, long crd, long cntr, + int opCntr, long link) throws IgniteCheckedException { + return isVisible(cctx, snapshot, crd, cntr, opCntr, false) + && isVisible(cctx, link, snapshot); + } + + /** + * Checks if a row has not empty new version (xid_max). + * + * @param row Row. + * @return {@code True} if row has a new version. + */ + public static boolean hasNewVersion(MvccUpdateVersionAware row) { + assert row.newMvccCoordinatorVersion() == MVCC_CRD_COUNTER_NA + || mvccVersionIsValid(row.newMvccCoordinatorVersion(), row.newMvccCounter(), row.newMvccOperationCounter()); + + return row.newMvccCoordinatorVersion() > MVCC_CRD_COUNTER_NA; + } + + /** + * Checks if a row's new version is visible for the given snapshot. + * + * @param cctx Cache context. + * @param link Link to the row. + * @param snapshot Mvcc snapshot. + * @return {@code True} if row is visible for the given snapshot. + * @throws IgniteCheckedException If failed. + */ + public static int getVisibleState(GridCacheContext cctx, long link, MvccSnapshot snapshot) + throws IgniteCheckedException { + return invoke(cctx, link, getVisibleState, snapshot); + } + + /** + * Returns new version of row (xid_max) if any. + * + * @param cctx Cache context. + * @param link Link to the row. + * @return New {@code MvccVersion} if row has xid_max, or null if doesn't. + * @throws IgniteCheckedException If failed. + */ + public static MvccVersion getNewVersion(GridCacheContext cctx, long link) + throws IgniteCheckedException { + return invoke(cctx, link, getNewVer, null); + } + + /** + * Compares row version (xid_min) with the given version. + * + * @param row Row. + * @param ver Version. + * @return Comparison result, see {@link Comparable}. + */ + public static int compare(MvccVersionAware row, MvccVersion ver) { + return compare(row.mvccCoordinatorVersion(), row.mvccCounter(), row.mvccOperationCounter(), + ver.coordinatorVersion(), ver.counter(), ver.operationCounter()); + } + + /** + * Compares to pairs of MVCC versions. See {@link Comparable}. + * + * @param mvccVerLeft First MVCC version. + * @param mvccCrdRight Second coordinator version. + * @param mvccCntrRight Second counter. + * @return Comparison result, see {@link Comparable}. + */ + public static int compare(MvccVersion mvccVerLeft, long mvccCrdRight, long mvccCntrRight) { + return compare(mvccVerLeft.coordinatorVersion(), mvccVerLeft.counter(), mvccCrdRight, mvccCntrRight); + } + + /** + * Compares to pairs of MVCC versions. See {@link Comparable}. + * + * @param row First MVCC version. + * @param mvccCrdRight Second coordinator version. + * @param mvccCntrRight Second counter. + * @return Comparison result, see {@link Comparable}. + */ + public static int compare(MvccVersionAware row, long mvccCrdRight, long mvccCntrRight) { + return compare(row.mvccCoordinatorVersion(), row.mvccCounter(), mvccCrdRight, mvccCntrRight); + } + + /** + * Compares to pairs of MVCC versions. See {@link Comparable}. + * + * @param mvccVerLeft First MVCC version. + * @param mvccCrdRight Second coordinator version. + * @param mvccCntrRight Second counter. + * @param mvccOpCntrRight Second operation counter. + * @return Comparison result, see {@link Comparable}. + */ + public static int compare(MvccVersion mvccVerLeft, long mvccCrdRight, long mvccCntrRight, int mvccOpCntrRight) { + return compare(mvccVerLeft.coordinatorVersion(), mvccVerLeft.counter(), + mvccVerLeft.operationCounter(), mvccCrdRight, mvccCntrRight, mvccOpCntrRight); + } + + /** + * Compares to pairs of coordinator/counter versions. See {@link Comparable}. + * + * @param mvccCrdLeft First coordinator version. + * @param mvccCntrLeft First counter version. + * @param mvccOpCntrLeft First operation counter. + * @param other The object to compare with. + * @return Comparison result, see {@link Comparable}. + */ + public static int compare(long mvccCrdLeft, long mvccCntrLeft, int mvccOpCntrLeft, MvccVersionAware other) { + return compare(mvccCrdLeft, mvccCntrLeft, mvccOpCntrLeft, + other.mvccCoordinatorVersion(), other.mvccCounter(), other.mvccOperationCounter()); + } + + /** + * Compares to pairs of coordinator/counter versions. See {@link Comparable}. + * + * @param mvccCrdLeft First coordinator version. + * @param mvccCntrLeft First counter version. + * @param mvccCrdRight Second coordinator version. + * @param mvccCntrRight Second counter version. + * @return Comparison result, see {@link Comparable}. + */ + public static int compare(long mvccCrdLeft, long mvccCntrLeft, long mvccCrdRight, long mvccCntrRight) { + return compare(mvccCrdLeft, mvccCntrLeft, 0, mvccCrdRight, mvccCntrRight, 0); + } + + /** + * Compares to pairs of coordinator/counter versions. See {@link Comparable}. + * + * @param mvccCrdLeft First coordinator version. + * @param mvccCntrLeft First counter version. + * @param mvccOpCntrLeft First operation counter. + * @param mvccCrdRight Second coordinator version. + * @param mvccCntrRight Second counter version. + * @param mvccOpCntrRight Second operation counter. + * @return Comparison result, see {@link Comparable}. + */ + public static int compare(long mvccCrdLeft, long mvccCntrLeft, int mvccOpCntrLeft, long mvccCrdRight, + long mvccCntrRight, int mvccOpCntrRight) { + int cmp; + + if ((cmp = Long.compare(mvccCrdLeft, mvccCrdRight)) != 0 + || (cmp = Long.compare(mvccCntrLeft, mvccCntrRight)) != 0 + || (cmp = Integer.compare(mvccOpCntrLeft & ~MVCC_HINTS_MASK, mvccOpCntrRight & ~MVCC_HINTS_MASK)) != 0) + return cmp; + + return 0; + } + + /** + * Compares new row version (xid_max) with the given counter and coordinator versions. + * + * @param row Row. + * @param mvccCrd Mvcc coordinator. + * @param mvccCntr Mvcc counter. + * @return Comparison result, see {@link Comparable}. + */ + public static int compareNewVersion(MvccUpdateVersionAware row, long mvccCrd, long mvccCntr) { + return compare(row.newMvccCoordinatorVersion(), row.newMvccCounter(), mvccCrd, mvccCntr); + } + + /** + * Compares new row version (xid_max) with the given counter and coordinator versions. + * + * @param row Row. + * @param mvccCrd Mvcc coordinator. + * @param mvccCntr Mvcc counter. + * @param opCntr Mvcc operation counter. + * @return Comparison result, see {@link Comparable}. + */ + public static int compareNewVersion(MvccUpdateVersionAware row, long mvccCrd, long mvccCntr, int opCntr) { + return compare(row.newMvccCoordinatorVersion(), row.newMvccCounter(), row.newMvccOperationCounter(), mvccCrd, mvccCntr, opCntr); + } + + /** + * Compares new row version (xid_max) with the given version. + * + * @param row Row. + * @param ver Version. + * @return Comparison result, see {@link Comparable}. + */ + public static int compareNewVersion(MvccUpdateVersionAware row, MvccVersion ver) { + return compare(row.newMvccCoordinatorVersion(), row.newMvccCounter(), row.newMvccOperationCounter(), + ver.coordinatorVersion(), ver.counter(), ver.operationCounter()); + } + + /** + * @param crdVer Mvcc coordinator version. + * @param cntr Counter. + * @param opCntr Operation counter. + * @return Always {@code true}. + */ + public static boolean mvccVersionIsValid(long crdVer, long cntr, int opCntr) { + return mvccVersionIsValid(crdVer, cntr) && opCntr != MVCC_OP_COUNTER_NA; + } + + /** + * @param crdVer Mvcc coordinator version. + * @param cntr Counter. + * @return {@code True} if version is valid. + */ + public static boolean mvccVersionIsValid(long crdVer, long cntr) { + return crdVer > MVCC_CRD_COUNTER_NA && cntr != MVCC_COUNTER_NA; + } + + /** + * @param topVer Topology version for cache operation. + * @return Error. + */ + public static ClusterTopologyServerNotFoundException noCoordinatorError(AffinityTopologyVersion topVer) { + return new ClusterTopologyServerNotFoundException("Mvcc coordinator is not assigned for " + + "topology version: " + topVer); + } + + /** + * @return Error. + */ + public static ClusterTopologyServerNotFoundException noCoordinatorError() { + return new ClusterTopologyServerNotFoundException("Mvcc coordinator is not assigned."); + } + + /** + * @param cctx Cache context. + * @param link Link to the row. + * @param snapshot Mvcc snapshot. + * @return {@code True} if row is updated for given snapshot. + * @throws IgniteCheckedException If failed. + */ + private static boolean isVisible(GridCacheContext cctx, long link, + MvccSnapshot snapshot) + throws IgniteCheckedException { + return invoke(cctx, link, isVisible, snapshot); + } + + /** + * Encapsulates common logic for working with row mvcc info: page locking/unlocking, checks and other. + * Strategy pattern. + * + * @param cctx Cache group. + * @param link Row link. + * @param clo Closure to apply. + * @param snapshot Mvcc snapshot. + * @param <R> Return type. + * @return Result. + * @throws IgniteCheckedException If failed. + */ + private static <R> R invoke(GridCacheContext cctx, long link, MvccClosure<R> clo, MvccSnapshot snapshot) + throws IgniteCheckedException { + assert cctx.mvccEnabled(); + + PageMemory pageMem = cctx.dataRegion().pageMemory(); + int grpId = cctx.groupId(); + + long pageId = pageId(link); + long page = pageMem.acquirePage(grpId, pageId); + + try { + long pageAddr = pageMem.readLock(grpId, pageId, page); + + try{ + DataPageIO dataIo = DataPageIO.VERSIONS.forPage(pageAddr); + + int offset = dataIo.getPayloadOffset(pageAddr, itemId(link), pageMem.pageSize(), MVCC_INFO_SIZE); + + long mvccCrd = dataIo.mvccCoordinator(pageAddr, offset); + long mvccCntr = dataIo.mvccCounter(pageAddr, offset); + int mvccOpCntr = dataIo.mvccOperationCounter(pageAddr, offset); + + assert mvccVersionIsValid(mvccCrd, mvccCntr, mvccOpCntr) : mvccVersion(mvccCrd, mvccCntr, mvccOpCntr); + + long newMvccCrd = dataIo.newMvccCoordinator(pageAddr, offset); + long newMvccCntr = dataIo.newMvccCounter(pageAddr, offset); + int newMvccOpCntr = dataIo.newMvccOperationCounter(pageAddr, offset); + + assert newMvccCrd == MVCC_CRD_COUNTER_NA || mvccVersionIsValid(newMvccCrd, newMvccCntr, newMvccOpCntr) + : mvccVersion(newMvccCrd, newMvccCntr, newMvccOpCntr); + + return clo.apply(cctx, snapshot, mvccCrd, mvccCntr, mvccOpCntr, newMvccCrd, newMvccCntr, newMvccOpCntr); + } + finally { + pageMem.readUnlock(grpId, pageId, page); + } + } + finally { + pageMem.releasePage(grpId, pageId, page); + } + } + + /** + * + * @param cctx Cache context. + * @param mvccCrd Coordinator version. + * @param mvccCntr Counter. + * @return {@code True} in case the corresponding transaction is in {@code TxState.COMMITTED} state. + * @throws IgniteCheckedException If failed. + */ + private static boolean isCommitted(GridCacheContext cctx, long mvccCrd, long mvccCntr, int mvccOpCntr) throws IgniteCheckedException { + return state(cctx, mvccCrd, mvccCntr, mvccOpCntr) == TxState.COMMITTED; + } + + /** + * Throw an {@link UnsupportedOperationException} if this cache is transactional and MVCC is enabled with + * appropriate message about corresponding operation type. + * @param cctx Cache context. + * @param opType operation type to mention in error message. + */ + public static void verifyMvccOperationSupport(GridCacheContext<?, ?> cctx, String opType) { + if (cctx.mvccEnabled()) + throw new UnsupportedOperationException(opType + " operations are not supported on transactional " + + "caches when MVCC is enabled."); + } + + /** + * Checks transaction state. + * @param tx Transaction. + * @return Checked transaction. + */ + public static GridNearTxLocal checkActive(GridNearTxLocal tx) { + if (tx != null && tx.state() != TransactionState.ACTIVE) + throw new IgniteSQLException("Transaction is already completed.", TRANSACTION_COMPLETED); + + return tx; + } + + + /** + * @param ctx Grid kernal context. + * @return Currently started user transaction, or {@code null} if none started. + */ + @Nullable public static GridNearTxLocal tx(GridKernalContext ctx) { + return tx(ctx, null); + } + + /** + * @param ctx Grid kernal context. + * @param txId Transaction ID. + * @return Currently started user transaction, or {@code null} if none started. + */ + @Nullable public static GridNearTxLocal tx(GridKernalContext ctx, @Nullable GridCacheVersion txId) { + IgniteTxManager tm = ctx.cache().context().tm(); + + IgniteInternalTx tx0 = txId == null ? tm.tx() : tm.tx(txId); + + GridNearTxLocal tx = tx0 != null && tx0.user() ? (GridNearTxLocal)tx0 : null; + + if (tx != null) { + if (!tx.pessimistic() || !tx.repeatableRead()) { + tx.setRollbackOnly(); + + throw new IgniteSQLException("Only pessimistic repeatable read transactions are supported at the moment.", + IgniteQueryErrorCode.UNSUPPORTED_OPERATION); + + } + + if (!tx.isOperationAllowed(true)) { + tx.setRollbackOnly(); + + throw new IgniteSQLException("SQL queries and cache operations " + + "may not be used in the same transaction.", IgniteQueryErrorCode.TRANSACTION_TYPE_MISMATCH); + } + } + + return tx; + } + + + /** + * @param ctx Grid kernal context. + * @param timeout Transaction timeout. + * @return Newly started SQL transaction. + */ + public static GridNearTxLocal txStart(GridKernalContext ctx, long timeout) { + return txStart(ctx, null, timeout); + } + + /** + * @param cctx Cache context. + * @param timeout Transaction timeout. + * @return Newly started SQL transaction. + */ + public static GridNearTxLocal txStart(GridCacheContext cctx, long timeout) { + return txStart(cctx.kernalContext(), cctx, timeout); + } + + /** + * @param ctx Grid kernal context. + * @param cctx Cache context. + * @param timeout Transaction timeout. + * @return Newly started SQL transaction. + */ + private static GridNearTxLocal txStart(GridKernalContext ctx, @Nullable GridCacheContext cctx, long timeout) { + if (timeout == 0) { + TransactionConfiguration tcfg = cctx != null ? + CU.transactionConfiguration(cctx, ctx.config()) : null; + + if (tcfg != null) + timeout = tcfg.getDefaultTxTimeout(); + } + + GridNearTxLocal tx = ctx.cache().context().tm().newTx( + false, + false, + cctx != null && cctx.systemTx() ? cctx : null, + PESSIMISTIC, + REPEATABLE_READ, + timeout, + cctx == null || !cctx.skipStore(), + true, + 0, + null + ); + + tx.syncMode(FULL_SYNC); + + return tx; + } + + /** + * @param ctx Grid kernal context. + * @return Whether MVCC is enabled or not on {@link IgniteConfiguration}. + */ + public static boolean mvccEnabled(GridKernalContext ctx) { + return ctx.config().isMvccEnabled(); + } + + /** + * Initialises MVCC filter and returns MVCC query tracker if needed. + * @param cctx Cache context. + * @param startTx Start transaction flag. + * @return MVCC query tracker. + * @throws IgniteCheckedException If failed. + */ + @NotNull public static MvccQueryTracker mvccTracker(GridCacheContext cctx, boolean startTx) throws IgniteCheckedException { + assert cctx != null && cctx.mvccEnabled(); + + GridNearTxLocal tx = tx(cctx.kernalContext()); + + if (tx == null && startTx) + tx = txStart(cctx, 0); + + return mvccTracker(cctx, tx); + } + + /** + * Initialises MVCC filter and returns MVCC query tracker if needed. + * @param cctx Cache context. + * @param tx Transaction. + * @return MVCC query tracker. + * @throws IgniteCheckedException If failed. + */ + @NotNull public static MvccQueryTracker mvccTracker(GridCacheContext cctx, + GridNearTxLocal tx) throws IgniteCheckedException { + MvccQueryTracker tracker; + + if (tx == null) + tracker = new MvccQueryTrackerImpl(cctx); + else if ((tracker = tx.mvccQueryTracker()) == null) + tracker = new StaticMvccQueryTracker(cctx, requestSnapshot(cctx, tx)) { + @Override public void onDone() { + // TODO IGNITE-8841 + checkActive(tx); + } + }; + + if (tracker.snapshot() == null) + // TODO IGNITE-7388 + tracker.requestSnapshot().get(); + + return tracker; + } + + /** + * @param cctx Cache context. + * @param tx Transaction. + * @throws IgniteCheckedException If failed. + * @return Mvcc snapshot. + */ + public static MvccSnapshot requestSnapshot(GridCacheContext cctx, + GridNearTxLocal tx) throws IgniteCheckedException { + MvccSnapshot snapshot; tx = checkActive(tx); + + if ((snapshot = tx.mvccSnapshot()) == null) { + MvccProcessor prc = cctx.shared().coordinators(); + + snapshot = prc.tryRequestSnapshotLocal(tx); + + if (snapshot == null) + // TODO IGNITE-7388 + snapshot = prc.requestSnapshotAsync(tx).get(); + + tx.mvccSnapshot(snapshot); + } + + return snapshot; + } + + /** */ + private static MvccVersion mvccVersion(long crd, long cntr, int opCntr) { + return new MvccVersionImpl(crd, cntr, opCntr); + } + + /** + * Mvcc closure interface. + * @param <R> Return type. + */ + private interface MvccClosure<R> { + /** + * Runs closure over the Mvcc info. + * @param snapshot Mvcc snapshot. + * @param mvccCrd Coordinator version. + * @param mvccCntr Counter. + * @param mvccOpCntr Operation counter. + * @param newMvccCrd New mvcc coordinator + * @param newMvccCntr New mvcc counter. + * @param newMvccOpCntr New mvcc operation counter. + * @return Result. + */ + public R apply(GridCacheContext cctx, MvccSnapshot snapshot, long mvccCrd, long mvccCntr, int mvccOpCntr, + long newMvccCrd, long newMvccCntr, int newMvccOpCntr) throws IgniteCheckedException; + } + + /** + * Closure for checking row visibility for snapshot. + */ + private static class GetVisibleState implements MvccClosure<Integer> { + /** {@inheritDoc} */ + @Override public Integer apply(GridCacheContext cctx, MvccSnapshot snapshot, long mvccCrd, long mvccCntr, + int mvccOpCntr, long newMvccCrd, long newMvccCntr, int newMvccOpCntr) throws IgniteCheckedException { + + if (!isVisible(cctx, snapshot, mvccCrd, mvccCntr, mvccOpCntr)) + return MVCC_INVISIBLE; + + if (newMvccCrd == MVCC_CRD_COUNTER_NA) + return MVCC_VISIBLE; + + assert mvccVersionIsValid(newMvccCrd, newMvccCntr, newMvccOpCntr); + + if (mvccCrd == newMvccCrd && mvccCntr == newMvccCntr) // Double-changed in scope of one transaction. + return MVCC_VISIBLE_REMOVED; + + return isVisible(cctx, snapshot, newMvccCrd, newMvccCntr, newMvccOpCntr) ? MVCC_VISIBLE_REMOVED : + MVCC_VISIBLE; + } + } + + /** + * Closure for checking whether the row is visible for given snapshot. + */ + private static class IsVisible implements MvccClosure<Boolean> { + /** {@inheritDoc} */ + @Override public Boolean apply(GridCacheContext cctx, MvccSnapshot snapshot, long mvccCrd, long mvccCntr, + int mvccOpCntr, long newMvccCrd, long newMvccCntr, int newMvccOpCntr) throws IgniteCheckedException { + + if (!isVisible(cctx, snapshot, mvccCrd, mvccCntr, mvccOpCntr)) + return false; + + if (newMvccCrd == MVCC_CRD_COUNTER_NA) + return true; + + assert mvccVersionIsValid(newMvccCrd, newMvccCntr, newMvccOpCntr); + + if (mvccCrd == newMvccCrd && mvccCntr == newMvccCntr) // Double-changed in scope of one transaction. + return false; + + return !isVisible(cctx, snapshot, newMvccCrd, newMvccCntr, newMvccOpCntr); + } + } + + /** + * Closure for getting xid_max version of row. + */ + private static class GetNewVersion implements MvccClosure<MvccVersion> { + /** {@inheritDoc} */ + @Override public MvccVersion apply(GridCacheContext cctx, MvccSnapshot snapshot, long mvccCrd, long mvccCntr, + int mvccOpCntr, long newMvccCrd, long newMvccCntr, int newMvccOpCntr) { + return newMvccCrd == MVCC_CRD_COUNTER_NA ? null : mvccVersion(newMvccCrd, newMvccCntr, newMvccOpCntr); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccVersion.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccVersion.java new file mode 100644 index 0000000..f43d3b9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccVersion.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import org.jetbrains.annotations.NotNull; + +/** + * MVCC version. This is unique version allowing to order all reads and writes within a cluster. Consists of two parts: + * - coordinator version - number which increases on every coordinator change; + * - counter - local coordinator counter which is increased on every update. + */ +public interface MvccVersion extends Comparable<MvccVersion> { + /** + * @return Coordinator version. + */ + public long coordinatorVersion(); + + /** + * @return Local counter. + */ + public long counter(); + + /** + * @return Operation id in scope of current transaction. + */ + public int operationCounter(); + + /** {@inheritDoc} */ + @Override default int compareTo(@NotNull MvccVersion another) { + return MvccUtils.compare(coordinatorVersion(), counter(), operationCounter(), + another.coordinatorVersion(), another.counter(), another.operationCounter()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccVersionAware.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccVersionAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccVersionAware.java new file mode 100644 index 0000000..3bfefbc --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccVersionAware.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +/** + * + */ +public interface MvccVersionAware { + /** + * @return Mvcc coordinator version. + */ + public long mvccCoordinatorVersion(); + + /** + * @return Mvcc counter. + */ + public long mvccCounter(); + + /** + * @return Mvcc operation counter. + */ + public int mvccOperationCounter(); + + /** + * @return Tx state hint for 'created' mvcc version. + */ + public byte mvccTxState(); + + /** + * Copies mvcc version from another object. + * @param other Info source. + */ + public default void mvccVersion(MvccVersionAware other) { + mvccVersion(other.mvccCoordinatorVersion(), other.mvccCounter(), other.mvccOperationCounter()); + } + + /** + * Sets mvcc version. + * @param ver Mvcc version. + */ + public default void mvccVersion(MvccVersion ver) { + mvccVersion(ver.coordinatorVersion(), ver.counter(), ver.operationCounter()); + } + + /** + * Sets mvcc version. + * @param crd Mvcc coordinator version. + * @param cntr Mvcc counter. + * @param opCntr Mvcc operation counter. + */ + public default void mvccVersion(long crd, long cntr, int opCntr) { + throw new UnsupportedOperationException(); + } + + /** + * @return Mvcc version. + */ + public default MvccVersion mvccVersion() { + return new MvccVersionImpl(mvccCoordinatorVersion(), mvccCounter(), mvccOperationCounter()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccVersionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccVersionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccVersionImpl.java new file mode 100644 index 0000000..ec3e137 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccVersionImpl.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +import java.nio.ByteBuffer; + +/** + * Base MVCC version implementation. + */ +public class MvccVersionImpl implements MvccVersion, Message { + /** */ + private static final long serialVersionUID = 0L; + + /** Coordinator version. */ + private long crdVer; + + /** Local counter. */ + private long cntr; + + /** Operation counter. */ + private int opCntr; + + /** + * Constructor. + */ + public MvccVersionImpl() { + // No-op. + } + + /** + * @param crdVer Coordinator version. + * @param cntr Counter. + * @param opCntr Operation counter. + */ + public MvccVersionImpl(long crdVer, long cntr, int opCntr) { + this.crdVer = crdVer; + this.cntr = cntr; + this.opCntr = opCntr; + } + + /** + * @return Coordinator version. + */ + public long coordinatorVersion() { + return crdVer; + } + + /** + * @return Local counter. + */ + public long counter() { + return cntr; + } + + /** {@inheritDoc} */ + @Override public int operationCounter() { + return opCntr; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + MvccVersionImpl that = (MvccVersionImpl) o; + + return crdVer == that.crdVer && cntr == that.cntr; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = (int) (crdVer ^ (crdVer >>> 32)); + + res = 31 * res + (int) (cntr ^ (cntr >>> 32)); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeLong("cntr", cntr)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeLong("crdVer", crdVer)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeInt("opCntr", opCntr)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + cntr = reader.readLong("cntr"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + crdVer = reader.readLong("crdVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + opCntr = reader.readInt("opCntr"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(MvccVersionImpl.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 148; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MvccVersionImpl.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NoOpMvccProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NoOpMvccProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NoOpMvccProcessor.java new file mode 100644 index 0000000..b9a5132 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NoOpMvccProcessor.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.util.Collection; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteDiagnosticPrepareContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.ExchangeContext; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.util.GridLongList; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +class NoOpMvccProcessor extends GridProcessorAdapter implements MvccProcessor { + /** + * @param ctx Kernal context. + */ + protected NoOpMvccProcessor(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public void onDiscoveryEvent(int evtType, Collection<ClusterNode> nodes, long topVer) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onExchangeStart(MvccCoordinator mvccCrd, ExchangeContext exchCtx, ClusterNode exchCrd) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onExchangeDone(boolean newCoord, DiscoCache discoCache, + Map<UUID, GridLongList> activeQueries) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void processClientActiveQueries(UUID nodeId, @Nullable GridLongList activeQueries) { + throw processorException(); + } + + /** {@inheritDoc} */ + @Nullable @Override public MvccCoordinator currentCoordinator() { + return null; + } + + /** {@inheritDoc} */ + @Nullable @Override public MvccCoordinator currentCoordinator(AffinityTopologyVersion topVer) { + return null; + } + + /** {@inheritDoc} */ + @Nullable @Override public MvccCoordinator coordinatorFromDiscoveryEvent() { + return null; + } + + /** {@inheritDoc} */ + @Override public UUID currentCoordinatorId() { + return null; + } + + /** {@inheritDoc} */ + @Override public void updateCoordinator(MvccCoordinator curCrd) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public byte state(long crdVer, long cntr) { + throw processorException(); + } + + /** {@inheritDoc} */ + @Override public byte state(MvccVersion ver) { + throw processorException(); + } + + /** {@inheritDoc} */ + @Override public void updateState(MvccVersion ver, byte state) { + throw processorException(); + } + + /** {@inheritDoc} */ + @Override public void updateState(MvccVersion ver, byte state, boolean primary) { + throw processorException(); + } + + /** {@inheritDoc} */ + @Override public void registerLocalTransaction(long crd, long cntr) { + throw processorException(); + } + + /** {@inheritDoc} */ + @Override public boolean hasLocalTransaction(long crd, long cntr) { + throw processorException(); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<Void> waitFor(GridCacheContext cctx, + MvccVersion locked) { + throw processorException(); + } + + /** {@inheritDoc} */ + @Override public void addQueryTracker(MvccQueryTracker tracker) { + throw processorException(); + } + + /** {@inheritDoc} */ + @Override public void removeQueryTracker(Long id) { + throw processorException(); + } + + /** {@inheritDoc} */ + @Override public MvccSnapshot tryRequestSnapshotLocal() { + throw processorException(); + } + + /** {@inheritDoc} */ + @Override public MvccSnapshot tryRequestSnapshotLocal( + @Nullable IgniteInternalTx tx) { + throw processorException(); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<MvccSnapshot> requestSnapshotAsync() { + throw processorException(); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<MvccSnapshot> requestSnapshotAsync(IgniteInternalTx tx) { + throw processorException(); + } + + /** {@inheritDoc} */ + @Override public void requestSnapshotAsync(MvccSnapshotResponseListener lsnr) { + throw processorException(); + } + + /** {@inheritDoc} */ + @Override public void requestSnapshotAsync(IgniteInternalTx tx, MvccSnapshotResponseListener lsnr) { + throw processorException(); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<Void> ackTxCommit(MvccSnapshot updateVer) { + throw processorException(); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<Void> ackTxCommit(MvccVersion updateVer, MvccSnapshot readSnapshot, + long qryId) { + throw processorException(); + } + + /** {@inheritDoc} */ + @Override public void ackTxRollback(MvccVersion updateVer) { + throw processorException(); + } + + /** {@inheritDoc} */ + @Override public void ackTxRollback(MvccVersion updateVer, MvccSnapshot readSnapshot, long qryTrackerId) { + throw processorException(); + } + + /** {@inheritDoc} */ + @Override public void ackQueryDone(MvccSnapshot snapshot, long qryId) { + throw processorException(); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<Void> waitTxsFuture(UUID crdId, GridLongList txs) { + throw processorException(); + } + + /** {@inheritDoc} */ + @Override public void dumpDebugInfo(IgniteLogger log, @Nullable IgniteDiagnosticPrepareContext diagCtx) { + // No-op. + } + + /** + * @return No-op processor usage exception; + */ + private IgniteException processorException() { + return new IgniteException("Current Ignite configuration does not support MVCC functionality " + + "(consider adding ignite-schedule module to classpath)."); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/StaticMvccQueryTracker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/StaticMvccQueryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/StaticMvccQueryTracker.java new file mode 100644 index 0000000..52fb1db --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/StaticMvccQueryTracker.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.jetbrains.annotations.NotNull; + +/** + * Simple MVCC tracker used only as an Mvcc snapshot holder. + */ +public class StaticMvccQueryTracker implements MvccQueryTracker { + /** */ + private final MvccSnapshot snapshot; + /** */ + private final GridCacheContext cctx; + + /** + * @param cctx Cache context. + * @param snapshot Mvcc snapshot. + */ + public StaticMvccQueryTracker(GridCacheContext cctx, MvccSnapshot snapshot) { + this.snapshot = snapshot; + this.cctx = cctx; + } + + /** {@inheritDoc} */ + @Override public MvccSnapshot snapshot() { + assert snapshot != null : this; + + return snapshot; + } + + /** {@inheritDoc} */ + @Override public GridCacheContext context() { + return cctx; + } + + /** {@inheritDoc} */ + @Override public AffinityTopologyVersion topologyVersion() { + return AffinityTopologyVersion.NONE; + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<MvccSnapshot> requestSnapshot() { + return new GridFinishedFuture<>(snapshot); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<MvccSnapshot> requestSnapshot(@NotNull final AffinityTopologyVersion topVer) { + return new GridFinishedFuture<>(snapshot); + } + + /** {@inheritDoc} */ + @Override public void requestSnapshot(@NotNull AffinityTopologyVersion topVer, @NotNull MvccSnapshotResponseListener lsnr) { + lsnr.onResponse(snapshot); + } + + /** {@inheritDoc} */ + @Override public void onDone() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<Void> onDone(@NotNull GridNearTxLocal tx, boolean commit) { + throw new UnsupportedOperationException("Operation is not supported."); + } + + /** {@inheritDoc} */ + @Override public long onMvccCoordinatorChange(MvccCoordinator newCrd) { + return MVCC_TRACKER_ID_NA; + } + + /** {@inheritDoc} */ + @Override public long id() { + return MVCC_TRACKER_ID_NA; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/VacuumMetrics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/VacuumMetrics.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/VacuumMetrics.java new file mode 100644 index 0000000..1de297f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/VacuumMetrics.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +/** + * + */ +public class VacuumMetrics { + /** */ + private long cleanupRowsCnt; + + /** */ + private long scannedRowsCnt; + + /** */ + private long searchNanoTime; + + /** */ + private long cleanupNanoTime; + + /** + * @return Cleanup rows count. + */ + public long cleanupRowsCount() { + return cleanupRowsCnt; + } + + /** + * @return Scanned rows count. + */ + public long scannedRowsCount() { + return scannedRowsCnt; + } + + /** + * @return Search nano time. + */ + public long searchNanoTime() { + return searchNanoTime; + } + + /** + * @return Cleanup nano time + */ + public long cleanupNanoTime() { + return cleanupNanoTime; + } + + + /** + * @param delta Delta. + */ + public void addCleanupRowsCnt(long delta) { + cleanupRowsCnt += delta; + } + + /** + * @param delta Delta. + */ + public void addScannedRowsCount(long delta) { + scannedRowsCnt += delta; + } + + /** + * @param delta Delta. + */ + public void addSearchNanoTime(long delta) { + searchNanoTime += delta; + } + + /** + * @param delta Delta. + */ + public void addCleanupNanoTime(long delta) { + cleanupNanoTime += delta; + } + + /** */ + @Override public String toString() { + return "VacuumMetrics[" + + "cleanupRowsCnt=" + cleanupRowsCnt + + ", scannedRowsCnt=" + scannedRowsCnt + + ", searchNanoTime=" + Math.round((float)searchNanoTime / 1_000_000) + + " ms, cleanupNanoTime=" + Math.round((float)cleanupNanoTime / 1_000_000) + + " ms]"; + } +}