Repository: ignite Updated Branches: refs/heads/ignite-3478 [created] f6e982540
http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java new file mode 100644 index 0000000..d7e865a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class CoordinatorQueryAckRequest implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long cntr; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public CoordinatorQueryAckRequest() { + // No-op. + } + + /** + * @param cntr Query counter. + */ + CoordinatorQueryAckRequest(long cntr) { + this.cntr = cntr; + } + + /** + * @return Counter. + */ + public long counter() { + return cntr; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeLong("cntr", cntr)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + cntr = reader.readLong("cntr"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(CoordinatorQueryAckRequest.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 134; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CoordinatorQueryAckRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryCounterRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryCounterRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryCounterRequest.java new file mode 100644 index 0000000..e893b22 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryCounterRequest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class CoordinatorQueryCounterRequest implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long futId; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public CoordinatorQueryCounterRequest() { + // No-op. + } + + /** + * @param futId Future ID. + */ + CoordinatorQueryCounterRequest(long futId) { + this.futId = futId; + } + + /** + * @return Future ID. + */ + public long futureId() { + return futId; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeLong("futId", futId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + futId = reader.readLong("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(CoordinatorQueryCounterRequest.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -33; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CoordinatorQueryCounterRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java new file mode 100644 index 0000000..5c4108d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class CoordinatorTxAckRequest implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private static final int SKIP_RESPONSE_FLAG_MASK = 0x01; + + /** */ + private long futId; + + /** */ + private GridCacheVersion txId; + + /** */ + private byte flags; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public CoordinatorTxAckRequest() { + // No-op. + } + + /** + * @param futId Future ID. + * @param txId Transaction ID. + */ + CoordinatorTxAckRequest(long futId, GridCacheVersion txId) { + this.futId = futId; + this.txId = txId; + } + + /** + * @return Future ID. + */ + long futureId() { + return futId; + } + + /** + * @return {@code True} if response message is not needed. + */ + boolean skipResponse() { + return (flags & SKIP_RESPONSE_FLAG_MASK) != 0; + } + + /** + * @param val {@code True} if response message is not needed. + */ + void skipResponse(boolean val) { + if (val) + flags |= SKIP_RESPONSE_FLAG_MASK; + else + flags &= ~SKIP_RESPONSE_FLAG_MASK; + } + + /** + * @return Transaction ID.s + */ + public GridCacheVersion txId() { + return txId; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeByte("flags", flags)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeLong("futId", futId)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeMessage("txId", txId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + flags = reader.readByte("flags"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + futId = reader.readLong("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + txId = reader.readMessage("txId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(CoordinatorTxAckRequest.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 131; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CoordinatorTxAckRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckResponse.java new file mode 100644 index 0000000..c48ba4b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckResponse.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class CoordinatorTxAckResponse implements Message { + /** */ + private long futId; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public CoordinatorTxAckResponse() { + // No-op. + } + + /** + * @param futId Future ID. + */ + CoordinatorTxAckResponse(long futId) { + this.futId = futId; + } + + /** + * @return Future ID. + */ + long futureId() { + return futId; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeLong("futId", futId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + futId = reader.readLong("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(CoordinatorTxAckResponse.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 132; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CoordinatorTxAckResponse.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxCounterRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxCounterRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxCounterRequest.java new file mode 100644 index 0000000..dbdefda --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxCounterRequest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class CoordinatorTxCounterRequest implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long futId; + + /** */ + private GridCacheVersion txId; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public CoordinatorTxCounterRequest() { + // No-op. + } + + /** + * @param futId Future ID. + * @param txId Transaction ID. + */ + CoordinatorTxCounterRequest(long futId, GridCacheVersion txId) { + assert txId != null; + + this.futId = futId; + this.txId = txId; + } + + /** + * @return Future ID. + */ + public long futureId() { + return futId; + } + + /** + * @return Transaction ID. + */ + public GridCacheVersion txId() { + return txId; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeLong("futId", futId)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeMessage("txId", txId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + futId = reader.readLong("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + txId = reader.readMessage("txId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(CoordinatorTxCounterRequest.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 129; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CoordinatorTxCounterRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccVersion.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccVersion.java new file mode 100644 index 0000000..b47ed3c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccVersion.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.NotNull; + +/** + * + */ +public class TxMvccVersion implements Comparable<TxMvccVersion> { + /** */ + public static final long COUNTER_NA = 0L; + + /** */ + private final long topVer; + + /** */ + private final long cntr; + + /** */ + private final GridCacheVersion txId; + + /** + * @param topVer Topology version. + * @param cntr Coordinator counter. + * @param txId Transaction ID. + */ + public TxMvccVersion(long topVer, long cntr, GridCacheVersion txId) { + assert topVer > 0 : topVer; + assert cntr != COUNTER_NA; + assert txId != null; + + this.topVer = topVer; + this.cntr = cntr; + this.txId = txId; + } + + /** {@inheritDoc} */ + @Override public int compareTo(@NotNull TxMvccVersion other) { + int cmp = Long.compare(topVer, other.topVer); + + if (cmp != 0) + return cmp; + + return Long.compare(cntr, other.cntr); + } + + /** + * @return Coordinators topology version. + */ + public long topologyVersion() { + return topVer; + } + + /** + * @return Counters. + */ + public long counter() { + return cntr; + } + + /** + * @return Transaction ID. + */ + public GridCacheVersion txId() { + return txId; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TxMvccVersion.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java index 4e3998b..2071432 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java @@ -99,7 +99,7 @@ public class IgniteWalIteratorFactory { dbMgr.setPageSize(pageSize); return new GridCacheSharedContext<>( - kernalCtx, null, null, null, + kernalCtx, null, null, null, null, null, null, dbMgr, null, null, null, null, null, null, null, null); http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java new file mode 100644 index 0000000..ee6cfd0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * + */ +public class CacheMvccTransactionsTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int SRVS = 4; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + cfg.setClientMode(client); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticTx1() throws Exception { + startGridsMultiThreaded(SRVS); + + try { + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + ignite(0).createCache(ccfg); + + try { + Ignite node = ignite(0); + + IgniteTransactions txs = node.transactions(); + + IgniteCache<Integer, Integer> cache = node.cache(ccfg.getName()); + + List<Integer> keys = testKeys(cache); + + for (Integer key : keys) { + log.info("Test key: " + key); + + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + Integer val = cache.get(key); + + assertNull(val); + + cache.put(key, key); + + val = cache.get(key); + + assertEquals(key, val); + + tx.commit(); + } + + Integer val = cache.get(key); + + assertEquals(key, val); + } + } + finally { + ignite(0).destroyCache(ccfg.getName()); + } + } + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testGetAll1() throws Exception { + startGridsMultiThreaded(SRVS); + + try { + client = true; + + Ignite ignite = startGrid(SRVS); + + CacheConfiguration ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 1); + + IgniteCache<Integer, Integer> cache = ignite.createCache(ccfg); + + Set<Integer> keys = new HashSet<>(); + + keys.addAll(primaryKeys(ignite(0).cache(ccfg.getName()), 2)); + + Map<Integer, Integer> res = cache.getAll(keys); + } + finally { + stopAllGrids(); + } + } + + /** + * @return Cache configurations. + */ + private List<CacheConfiguration<Integer, Integer>> cacheConfigurations() { + List<CacheConfiguration<Integer, Integer>> ccfgs = new ArrayList<>(); + + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0)); + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1)); + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2)); + ccfgs.add(cacheConfiguration(REPLICATED, FULL_SYNC, 0)); + + return ccfgs; + } + + /** + * @param ccfg Cache configuration. + */ + private void logCacheInfo(CacheConfiguration<?, ?> ccfg) { + log.info("Test cache [mode=" + ccfg.getCacheMode() + + ", sync=" + ccfg.getWriteSynchronizationMode() + + ", backups=" + ccfg.getBackups() + + ", near=" + (ccfg.getNearConfiguration() != null) + + ']'); + } + + /** + * @param cache Cache. + * @return Test keys. + * @throws Exception If failed. + */ + private List<Integer> testKeys(IgniteCache<Integer, Integer> cache) throws Exception { + CacheConfiguration ccfg = cache.getConfiguration(CacheConfiguration.class); + + List<Integer> keys = new ArrayList<>(); + + if (ccfg.getCacheMode() == PARTITIONED) + keys.add(nearKey(cache)); + + keys.add(primaryKey(cache)); + + if (ccfg.getBackups() != 0) + keys.add(backupKey(cache)); + + return keys; + } + + /** + * @param cacheMode Cache mode. + * @param syncMode Write synchronization mode. + * @param backups Number of backups. + * @return Cache configuration. + */ + private CacheConfiguration<Integer, Integer> cacheConfiguration( + CacheMode cacheMode, + CacheWriteSynchronizationMode syncMode, + int backups) { + CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(); + + ccfg.setCacheMode(cacheMode); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(syncMode); + ccfg.setMvccEnabled(true); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(backups); + + return ccfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java index 6f58782..5ae21a9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java @@ -51,6 +51,7 @@ public class BPlusTreePageMemoryImplTest extends BPlusTreeSelfTest { null, null, null, + null, new NoOpPageStoreManager(), new NoOpWALManager(), new IgniteCacheDatabaseSharedManager(), http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java index b263d4f..13a254a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java @@ -52,6 +52,7 @@ public class BPlusTreeReuseListPageMemoryImplTest extends BPlusTreeReuseSelfTest null, null, null, + null, new NoOpPageStoreManager(), new NoOpWALManager(), new IgniteCacheDatabaseSharedManager(), http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java index d9257bd..adec72e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java @@ -67,6 +67,7 @@ public class MetadataStoragePageMemoryImplTest extends MetadataStorageSelfTest{ null, null, null, + null, new NoOpPageStoreManager(), new NoOpWALManager(), new IgniteCacheDatabaseSharedManager(), http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java index 1fff1f0..f4e5991 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java @@ -56,6 +56,7 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest { null, null, null, + null, new NoOpPageStoreManager(), new NoOpWALManager(), new IgniteCacheDatabaseSharedManager(), http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java index 0366eca..bb80a09 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java @@ -79,6 +79,7 @@ public class PageMemoryImplTest extends GridCommonAbstractTest { null, null, null, + null, new NoOpPageStoreManager(), new NoOpWALManager(), new IgniteCacheDatabaseSharedManager(), http://git-wip-us.apache.org/repos/asf/ignite/blob/f6e98254/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java index 6a1d4f4..ee43309 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheTtlManager; import org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager; import org.apache.ignite.internal.processors.cache.dr.GridOsCacheDrManager; import org.apache.ignite.internal.processors.cache.jta.CacheNoopJtaManager; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager; import org.apache.ignite.internal.processors.cache.query.GridCacheLocalQueryManager; @@ -64,6 +65,7 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> { ctx, new GridCacheSharedContext<>( ctx, + new CacheCoordinatorsSharedManager(), new IgniteTxManager(), new GridCacheVersionManager(), new GridCacheMvccManager(),
