IGNITE-6014 Added TX marker records. This closes #2578
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5377af2c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5377af2c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5377af2c Branch: refs/heads/ignite-3478 Commit: 5377af2c24642944960b9953e15ac5badccb1a16 Parents: f9955fd Author: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com> Authored: Mon Sep 18 16:45:30 2017 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Mon Sep 18 16:56:19 2017 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 8 + .../managers/discovery/ConsistentIdMapper.java | 101 ++++++++ .../internal/pagemem/wal/record/TxRecord.java | 136 ++++++---- .../cache/distributed/dht/GridDhtTxRemote.java | 5 + .../distributed/near/GridNearTxRemote.java | 5 + .../wal/serializer/RecordV1Serializer.java | 24 +- .../wal/serializer/TxRecordSerializer.java | 228 +++++++++++++++++ .../cache/transactions/IgniteTxAdapter.java | 46 ++++ .../cache/transactions/IgniteTxManager.java | 14 + .../db/wal/IgniteWalRecoveryTest.java | 256 +++++++++++++++++++ .../db/wal/reader/IgniteWalReaderTest.java | 4 +- .../db/wal/reader/MockWalIteratorFactory.java | 1 + 12 files changed, 768 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5377af2c/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 39c19fb..ec79026 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -714,6 +714,14 @@ public final class IgniteSystemProperties { "IGNITE_DATA_STREAMING_EXECUTOR_SERVICE_TASKS_STEALING_THRESHOLD"; /** + * If the property is set {@link org.apache.ignite.internal.pagemem.wal.record.TxRecord} records + * will be logged to WAL. + * + * Default value is {@code false}. + */ + public static final String IGNITE_WAL_LOG_TX_RECORDS = "IGNITE_WAL_LOG_TX_RECORDS"; + + /** * Enforces singleton. */ private IgniteSystemProperties() { http://git-wip-us.apache.org/repos/asf/ignite/blob/5377af2c/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdMapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdMapper.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdMapper.java new file mode 100644 index 0000000..c524331 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdMapper.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.discovery; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +/** + * Class is needed for map UUID to consistent id and vice versa. + */ +public class ConsistentIdMapper { + /** Discovery manager. */ + private final GridDiscoveryManager discoveryManager; + + /** + * Create an instance of mapper. + * + * @param discoveryManager Discovery manager. + */ + public ConsistentIdMapper(GridDiscoveryManager discoveryManager) { + this.discoveryManager = discoveryManager; + } + + /** + * Map UUID to consistent id. + * + * @param topVer Topology version. + * @param nodeId UUID of node. + * @return Consistent id of node. + */ + public Object mapToConsistentId(AffinityTopologyVersion topVer, UUID nodeId) { + ClusterNode node = discoveryManager.node(topVer, nodeId); + + if (node == null) + throw new IllegalStateException("Unable to find node by UUID [nodeId=" + nodeId + ", topVer=" + topVer + ']'); + + return node.consistentId(); + } + + /** + * Map consistent id to UUID. + * + * @param consistentId Consistent id of node. + * @return UUID of node. + */ + @Nullable public UUID mapToUUID(Object consistentId) { + for (ClusterNode node : discoveryManager.allNodes()) + if (node.consistentId().equals(consistentId)) + return node.id(); + + return null; + } + + /** + * Map primary -> backup node UUIDs to consistent ids. + * + * @param txNodes Primary -> backup UUID nodes. + * @return Primary -> backup consistent id nodes. + */ + public Map<Object, Collection<Object>> mapToConsistentIds(AffinityTopologyVersion topVer, @Nullable Map<UUID, Collection<UUID>> txNodes) { + if (txNodes == null) + return null; + + Map<Object, Collection<Object>> consistentMap = U.newHashMap(txNodes.keySet().size()); + + for (UUID node : txNodes.keySet()) { + Collection<UUID> backupNodes = txNodes.get(node); + + Collection<Object> consistentIdsBackups = new ArrayList<>(backupNodes.size()); + + for (UUID backup : backupNodes) + consistentIdsBackups.add(mapToConsistentId(topVer, backup)); + + consistentMap.put(mapToConsistentId(topVer, node), consistentIdsBackups); + } + + return consistentMap; + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5377af2c/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java index 9bb747b..ce1e28e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java @@ -17,56 +17,61 @@ package org.apache.ignite.internal.pagemem.wal.record; -import java.util.UUID; +import java.util.Collection; +import java.util.Map; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.transactions.TransactionState; +import org.jetbrains.annotations.Nullable; /** * Logical data record indented for transaction (tx) related actions.<br> * This record is marker of begin, prepare, commit, and rollback transactions. */ public class TxRecord extends WALRecord { - /** - * Tx action enum. - */ - public enum TxAction { - /** Transaction begin. */ - BEGIN, - - /** Transaction prepare. */ - PREPARE, - - /** Transaction commit. */ - COMMIT, - - /** Transaction rollback. */ - ROLLBACK; - - /** Available values. */ - private static final TxAction[] VALS = TxAction.values(); - - /** - * Gets tx action value from ordinal. - * - * @param ord Ordinal. - * @return Value. - */ - public static TxAction fromOrdinal(int ord) { - return ord < 0 || ord >= VALS.length ? null : VALS[ord]; - } - } + /** Transaction state. */ + private TransactionState state; - /** */ - private TxAction action; - - /** Global transaction identifier within cluster, assigned by transaction coordinator */ + /** Global transaction identifier within cluster, assigned by transaction coordinator. */ private GridCacheVersion nearXidVer; - /** */ - private GridCacheVersion dhtVer; + /** Transaction entries write topology version. */ + private GridCacheVersion writeVer; + + /** + * Transaction participating nodes. + * + * Structure: + * Primary node -> [Backup nodes...] + **/ + @Nullable private Map<Object, Collection<Object>> participatingNodes; + + /** If transaction is remote, primary node for this backup node. */ + @Nullable private Object primaryNode; + + /** Timestamp of Tx state change. */ + private long timestamp; - /** */ - private UUID[] participatingNodeIds; + /** + * + * @param state Transaction state. + * @param nearXidVer Transaction id. + * @param writeVer Transaction entries write topology version. + * @param participatingNodes Primary -> Backup nodes participating in transaction. + */ + public TxRecord(TransactionState state, + GridCacheVersion nearXidVer, + GridCacheVersion writeVer, + @Nullable Map<Object, Collection<Object>> participatingNodes, + @Nullable Object primaryNode, + long timestamp) { + this.state = state; + this.nearXidVer = nearXidVer; + this.writeVer = writeVer; + this.participatingNodes = participatingNodes; + this.primaryNode = primaryNode; + this.timestamp = timestamp; + } /** {@inheritDoc} */ @Override public RecordType type() { @@ -90,43 +95,64 @@ public class TxRecord extends WALRecord { /** * @return DHT version. */ - public GridCacheVersion dhtVersion() { - return dhtVer; + public GridCacheVersion writeVersion() { + return writeVer; + } + + /** + * @param writeVer DHT version. + */ + public void dhtVersion(GridCacheVersion writeVer) { + this.writeVer = writeVer; + } + + /** + * @return Transaction state. + */ + public TransactionState state() { + return state; + } + + /** + * @param state Transaction state. + */ + public void state(TransactionState state) { + this.state = state; } /** - * @param dhtVer DHT version. + * @return Primary -> backup participating nodes. */ - public void dhtVersion(GridCacheVersion dhtVer) { - this.dhtVer = dhtVer; + public Map<Object, Collection<Object>> participatingNodes() { + return participatingNodes; } /** - * @return Action. + * @param participatingNodeIds Primary -> backup participating nodes. */ - public TxAction action() { - return action; + public void participatingNodes(Map<Object, Collection<Object>> participatingNodeIds) { + this.participatingNodes = participatingNodeIds; } /** - * @param action Action. + * @return Is transaction remote for backup. */ - public void action(TxAction action) { - this.action = action; + public boolean remote() { + return primaryNode != null; } /** - * @param participatingNodeIds Participating node IDs. + * @return Primary node for backup if transaction is remote. */ - public void participatingNodeIds(UUID[] participatingNodeIds) { - this.participatingNodeIds = participatingNodeIds; + @Nullable public Object primaryNode() { + return primaryNode; } /** - * @return Participating node IDs. + * @return Timestamp of Tx state change in millis. */ - public UUID[] participatingNodeId() { - return participatingNodeIds; + public long timestamp() { + return timestamp; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5377af2c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index 4373cda..746eb38 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -229,6 +229,11 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { } /** {@inheritDoc} */ + @Override public boolean remote() { + return true; + } + + /** {@inheritDoc} */ @Override public boolean dht() { return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/5377af2c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java index e5cd469..5477af9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java @@ -217,6 +217,11 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { } /** {@inheritDoc} */ + @Override public boolean remote() { + return true; + } + + /** {@inheritDoc} */ @Override public boolean near() { return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/5377af2c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java index b78e2e3..ce6fdc7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.pagemem.wal.record.DataRecord; import org.apache.ignite.internal.pagemem.wal.record.LazyDataEntry; import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord; import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot; +import org.apache.ignite.internal.pagemem.wal.record.TxRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord; @@ -83,6 +84,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVersionIO; @@ -93,7 +95,6 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.RecordSeriali import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32; import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.util.typedef.F; @@ -139,6 +140,9 @@ public class RecordV1Serializer implements RecordSerializer { /** Write pointer. */ private final boolean writePointer; + /** Serializer of {@link TxRecord} records. */ + private TxRecordSerializer txRecordSerializer; + /** * @param cctx Cache shared context. */ @@ -155,6 +159,7 @@ public class RecordV1Serializer implements RecordSerializer { co = cctx.kernalContext().cacheObjects(); pageSize = cctx.database().pageSize(); + txRecordSerializer = new TxRecordSerializer(cctx); } /** {@inheritDoc} */ @@ -661,6 +666,11 @@ public class RecordV1Serializer implements RecordSerializer { break; + case TX_RECORD: + txRecordSerializer.writeTxRecord((TxRecord) record, buf); + + break; + default: throw new UnsupportedOperationException("Type: " + record.type()); } @@ -1246,6 +1256,11 @@ public class RecordV1Serializer implements RecordSerializer { case SWITCH_SEGMENT_RECORD: throw new EOFException("END OF SEGMENT"); + case TX_RECORD: + res = txRecordSerializer.readTxRecord(in); + + break; + default: throw new UnsupportedOperationException("Type: " + recType); } @@ -1417,6 +1432,9 @@ public class RecordV1Serializer implements RecordSerializer { case SWITCH_SEGMENT_RECORD: return commonFields - CRC_SIZE; //CRC is not loaded for switch segment, exception is thrown instead + case TX_RECORD: + return commonFields + txRecordSerializer.sizeOfTxRecord((TxRecord) record); + default: throw new UnsupportedOperationException("Type: " + record.type()); } @@ -1659,7 +1677,7 @@ public class RecordV1Serializer implements RecordSerializer { * @param ver Version to write. * @param allowNull Is {@code null}version allowed. */ - private void putVersion(ByteBuffer buf, GridCacheVersion ver, boolean allowNull) { + static void putVersion(ByteBuffer buf, GridCacheVersion ver, boolean allowNull) { CacheVersionIO.write(buf, ver, allowNull); } @@ -1670,7 +1688,7 @@ public class RecordV1Serializer implements RecordSerializer { * @param allowNull Is {@code null}version allowed. * @return Read cache version. */ - private GridCacheVersion readVersion(ByteBufferBackedDataInput in, boolean allowNull) throws IOException { + static GridCacheVersion readVersion(ByteBufferBackedDataInput in, boolean allowNull) throws IOException { // To be able to read serialization protocol version. in.ensure(1); http://git-wip-us.apache.org/repos/asf/ignite/blob/5377af2c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/TxRecordSerializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/TxRecordSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/TxRecordSerializer.java new file mode 100644 index 0000000..448bdbc --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/TxRecordSerializer.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.serializer; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.wal.record.TxRecord; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVersionIO; +import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.transactions.TransactionState; + +/** + * {@link TxRecord} WAL serializer. + */ +public class TxRecordSerializer { + /** Cache shared context. */ + private GridCacheSharedContext cctx; + + /** Class loader to unmarshal consistent ids. */ + private ClassLoader classLoader; + + /** + * Create an instance of serializer. + * + * @param cctx Cache shared context. + */ + public TxRecordSerializer(GridCacheSharedContext cctx) { + this.cctx = cctx; + + classLoader = U.resolveClassLoader(cctx.gridConfig()); + } + + /** + * Writes {@link TxRecord} to given buffer. + * + * @param record TxRecord. + * @param buf Byte buffer. + * @throws IgniteCheckedException In case of fail. + */ + public void writeTxRecord(TxRecord record, ByteBuffer buf) throws IgniteCheckedException { + buf.put((byte) record.state().ordinal()); + RecordV1Serializer.putVersion(buf, record.nearXidVersion(), true); + RecordV1Serializer.putVersion(buf, record.writeVersion(), true); + + if (record.participatingNodes() != null) { + buf.putInt(record.participatingNodes().keySet().size()); + + for (Object primaryNode : record.participatingNodes().keySet()) { + writeConsistentId(primaryNode, buf); + + Collection<Object> backupNodes = record.participatingNodes().get(primaryNode); + + buf.putInt(backupNodes.size()); + + for (Object backupNode : backupNodes) { + writeConsistentId(backupNode, buf); + } + } + } + else { + // Put zero size of participating nodes. + buf.putInt(0); + } + + buf.put((byte) (record.remote() ? 1 : 0)); + + if (record.remote()) + writeConsistentId(record.primaryNode(), buf); + + buf.putLong(record.timestamp()); + } + + /** + * Reads {@link TxRecord} from given input. + * + * @param in Input + * @return TxRecord. + * @throws IOException In case of fail. + * @throws IgniteCheckedException In case of fail. + */ + public TxRecord readTxRecord(ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException { + byte txState = in.readByte(); + TransactionState state = TransactionState.fromOrdinal(txState); + + GridCacheVersion nearXidVer = RecordV1Serializer.readVersion(in, true); + GridCacheVersion writeVer = RecordV1Serializer.readVersion(in, true); + + int participatingNodesSize = in.readInt(); + Map<Object, Collection<Object>> participatingNodes = new HashMap<>(2 * participatingNodesSize); + + for (int i = 0; i < participatingNodesSize; i++) { + Object primaryNode = readConsistentId(in); + + int backupNodesSize = in.readInt(); + + Collection<Object> backupNodes = new ArrayList<>(backupNodesSize); + + for (int j = 0; j < backupNodesSize; j++) { + Object backupNode = readConsistentId(in); + + backupNodes.add(backupNode); + } + + participatingNodes.put(primaryNode, backupNodes); + } + + boolean hasRemote = in.readByte() == 1; + + Object primaryNode = null; + + if (hasRemote) + primaryNode = readConsistentId(in); + + long timestamp = in.readLong(); + + return new TxRecord(state, nearXidVer, writeVer, participatingNodes, primaryNode, timestamp); + } + + /** + * Returns size of marshalled {@link TxRecord} in bytes. + * + * @param record TxRecord. + * @return Size of TxRecord in bytes. + * @throws IgniteCheckedException In case of fail. + */ + public int sizeOfTxRecord(TxRecord record) throws IgniteCheckedException { + int size = 0; + + size += /* transaction state. */ 1; + size += CacheVersionIO.size(record.nearXidVersion(), true); + size += CacheVersionIO.size(record.writeVersion(), true); + + size += /* primary nodes count. */ 4; + + if (record.participatingNodes() != null) { + for (Object primaryNode : record.participatingNodes().keySet()) { + size += /* byte array length. */ 4; + size += marshalConsistentId(primaryNode).length; + + Collection<Object> backupNodes = record.participatingNodes().get(primaryNode); + + size += /* size of backup nodes. */ 4; + + for (Object backupNode : backupNodes) { + size += /* byte array length. */ 4; + size += marshalConsistentId(backupNode).length; + } + } + } + + size += /* Is primary node exist. */ 1; + + if (record.remote()) { + size += /* byte array length. */ 4; + size += marshalConsistentId(record.primaryNode()).length; + } + + size += /* Timestamp */ 8; + + return size; + } + + /** + * Marshal consistent id to byte array. + * + * @param consistentId Consistent id. + * @return Marshalled byte array. + * @throws IgniteCheckedException In case of fail. + */ + private byte[] marshalConsistentId(Object consistentId) throws IgniteCheckedException { + return cctx.marshaller().marshal(consistentId); + } + + /** + * Read consistent id from given input. + * + * @param in Input. + * @return Consistent id. + * @throws IOException In case of fail. + * @throws IgniteCheckedException In case of fail. + */ + private Object readConsistentId(ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException { + int len = in.readInt(); + in.ensure(len); + + byte[] content = new byte[len]; + in.readFully(content); + + return cctx.marshaller().unmarshal(content, classLoader); + } + + /** + * Write consistent id to given buffer. + * + * @param consistentId Consistent id. + * @param buf Byte buffer. + * @throws IgniteCheckedException In case of fail. + */ + private void writeConsistentId(Object consistentId, ByteBuffer buf) throws IgniteCheckedException { + byte[] content = marshalConsistentId(consistentId); + + buf.putInt(content.length); + buf.put(content); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5377af2c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 4d85db5..c447436 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -39,10 +39,13 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.managers.discovery.ConsistentIdMapper; +import org.apache.ignite.internal.pagemem.wal.record.TxRecord; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheInvokeEntry; import org.apache.ignite.internal.processors.cache.CacheLazyEntry; @@ -94,6 +97,7 @@ import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; import static org.apache.ignite.transactions.TransactionState.ACTIVE; +import static org.apache.ignite.transactions.TransactionState.COMMITTED; import static org.apache.ignite.transactions.TransactionState.COMMITTING; import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK; import static org.apache.ignite.transactions.TransactionState.PREPARED; @@ -245,6 +249,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement /** Store used flag. */ protected boolean storeEnabled = true; + /** UUID to consistent id mapper. */ + protected ConsistentIdMapper consistentIdMapper; + /** * Empty constructor required for {@link Externalizable}. */ @@ -308,6 +315,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement if (log == null) log = U.logger(cctx.kernalContext(), logRef, this); + + consistentIdMapper = new ConsistentIdMapper(cctx.discovery()); } /** @@ -357,6 +366,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement if (log == null) log = U.logger(cctx.kernalContext(), logRef, this); + + consistentIdMapper = new ConsistentIdMapper(cctx.discovery()); } /** {@inheritDoc} */ @@ -570,6 +581,13 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement return log; } + /** + * @return True if transaction reflects changes in primary -> backup direction. + */ + public boolean remote() { + return false; + } + /** {@inheritDoc} */ @Override public boolean near() { return false; @@ -1075,6 +1093,34 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement // Seal transactions maps. if (state != ACTIVE && state != SUSPENDED) seal(); + + if (cctx.wal() != null && cctx.tm().logTxRecords()) { + // Log tx state change to WAL. + if (state == PREPARED || state == COMMITTED || state == ROLLED_BACK) { + assert txNodes != null || state == ROLLED_BACK; + + Map<Object, Collection<Object>> participatingNodes = consistentIdMapper + .mapToConsistentIds(topVer, txNodes); + + TxRecord txRecord = new TxRecord( + state, + nearXidVersion(), + writeVersion(), + participatingNodes, + remote() ? nodeId() : null, + U.currentTimeMillis() + ); + + try { + cctx.wal().log(txRecord); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to log TxRecord: " + txRecord, e); + + throw new IgniteException("Failed to log TxRecord: " + txRecord, e); + } + } + } } return valid; http://git-wip-us.apache.org/repos/asf/ignite/blob/5377af2c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 474b484..9a8280f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.pagemem.wal.record.TxRecord; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObjectsReleaseFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -98,6 +99,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_COMPLETED_TX_C import static org.apache.ignite.IgniteSystemProperties.IGNITE_SLOW_TX_WARN_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS; import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_SALVAGE_TIMEOUT; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.GridTopic.TOPIC_TX; @@ -202,6 +204,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** TxDeadlock detection. */ private TxDeadlockDetection txDeadlockDetection; + /** Flag indicates that {@link TxRecord} records will be logged to WAL. */ + private boolean logTxRecords; + /** {@inheritDoc} */ @Override protected void onKernalStop0(boolean cancel) { cctx.gridIO().removeMessageListener(TOPIC_TX); @@ -276,6 +281,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { this.txDeadlockDetection = new TxDeadlockDetection(cctx); cctx.gridIO().addMessageListener(TOPIC_TX, new DeadlockDetectionListener()); + + this.logTxRecords = IgniteSystemProperties.getBoolean(IGNITE_WAL_LOG_TX_RECORDS, false); } /** @@ -2315,6 +2322,13 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** + * @return True if {@link TxRecord} records should be logged to WAL. + */ + public boolean logTxRecords() { + return logTxRecords; + } + + /** * Timeout object for node failure handler. */ private final class NodeFailureTimeoutObject extends GridTimeoutObjectAdapter { http://git-wip-us.apache.org/repos/asf/ignite/blob/5377af2c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java index 399e36d..718a9a8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java @@ -22,9 +22,12 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Random; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -33,8 +36,11 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheRebalanceMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.annotations.QuerySqlField; @@ -53,14 +59,18 @@ import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.pagemem.wal.WALPointer; +import org.apache.ignite.internal.pagemem.wal.record.DataEntry; +import org.apache.ignite.internal.pagemem.wal.record.DataRecord; import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord; import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot; +import org.apache.ignite.internal.pagemem.wal.record.TxRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; import org.apache.ignite.internal.processors.cache.persistence.tree.io.TrackingPageIO; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.PAX; @@ -75,6 +85,9 @@ import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.testframework.junits.multijvm.IgniteProcessProxy; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; import org.junit.Assert; import sun.nio.ch.DirectBuffer; @@ -1014,6 +1027,249 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { } /** + * Test recovery from WAL on 3 nodes in case of transactional cache. + * + * @throws Exception If fail. + */ + public void testRecoveryOnTransactionalAndPartitionedCache() throws Exception { + IgniteEx ignite = (IgniteEx) startGrids(3); + ignite.active(true); + + try { + final String cacheName = "transactional"; + + CacheConfiguration<Object, Object> cacheConfiguration = new CacheConfiguration<>(cacheName) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setAffinity(new RendezvousAffinityFunction(false, 32)) + .setCacheMode(CacheMode.PARTITIONED) + .setRebalanceMode(CacheRebalanceMode.SYNC) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) + .setBackups(2); + + ignite.createCache(cacheConfiguration); + + IgniteCache<Object, Object> cache = ignite.cache(cacheName); + Map<Object, Object> map = new HashMap<>(); + + final int transactions = 100; + final int operationsPerTransaction = 40; + + Random random = new Random(); + + for (int t = 1; t <= transactions; t++) { + Transaction tx = ignite.transactions().txStart( + TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED); + + Map<Object, Object> changesInTransaction = new HashMap<>(); + + for (int op = 0; op < operationsPerTransaction; op++) { + int key = random.nextInt(1000) + 1; + + Object value; + if (random.nextBoolean()) + value = randomString(random) + key; + else + value = new BigObject(key); + + changesInTransaction.put(key, value); + + cache.put(key, value); + } + + if (random.nextBoolean()) { + tx.commit(); + map.putAll(changesInTransaction); + } + else { + tx.rollback(); + } + + if (t % 50 == 0) + log.info("Finished transaction " + t); + } + + stopAllGrids(); + + ignite = (IgniteEx) startGrids(3); + ignite.active(true); + + cache = ignite.cache(cacheName); + + for (Object key : map.keySet()) { + Object expectedValue = map.get(key); + Object actualValue = cache.get(key); + Assert.assertEquals("Unexpected value for key " + key, expectedValue, actualValue); + } + } + finally { + stopAllGrids(); + } + } + + /** + * Test that all DataRecord WAL records are within transaction boundaries - PREPARED and COMMITTED markers. + * + * @throws Exception If any fail. + */ + public void testTxRecordsConsistency() throws Exception { + System.setProperty(IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS, "true"); + + IgniteEx ignite = (IgniteEx) startGrids(3); + ignite.active(true); + + try { + final String cacheName = "transactional"; + + CacheConfiguration<Object, Object> cacheConfiguration = new CacheConfiguration<>(cacheName) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setAffinity(new RendezvousAffinityFunction(false, 32)) + .setCacheMode(CacheMode.PARTITIONED) + .setRebalanceMode(CacheRebalanceMode.SYNC) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) + .setBackups(0); + + ignite.createCache(cacheConfiguration); + + IgniteCache<Object, Object> cache = ignite.cache(cacheName); + + GridCacheSharedContext<Object, Object> sharedCtx = ignite.context().cache().context(); + + GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)sharedCtx.database(); + + db.waitForCheckpoint("test"); + db.enableCheckpoints(false).get(); + + // Log something to know where to start. + WALPointer startPtr = sharedCtx.wal().log(new MemoryRecoveryRecord(U.currentTimeMillis())); + + final int transactions = 100; + final int operationsPerTransaction = 40; + + Random random = new Random(); + + for (int t = 1; t <= transactions; t++) { + Transaction tx = ignite.transactions().txStart( + TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED); + + for (int op = 0; op < operationsPerTransaction; op++) { + int key = random.nextInt(1000) + 1; + + Object value; + if (random.nextBoolean()) + value = randomString(random) + key; + else + value = new BigObject(key); + + cache.put(key, value); + } + + if (random.nextBoolean()) { + tx.commit(); + } + else { + tx.rollback(); + } + + if (t % 50 == 0) + log.info("Finished transaction " + t); + } + + Set<GridCacheVersion> activeTransactions = new HashSet<>(); + + // Check that all DataRecords are within PREPARED and COMMITTED tx records. + try (WALIterator it = sharedCtx.wal().replay(startPtr)) { + while (it.hasNext()) { + IgniteBiTuple<WALPointer, WALRecord> tup = it.next(); + + WALRecord rec = tup.get2(); + + if (rec instanceof TxRecord) { + TxRecord txRecord = (TxRecord) rec; + GridCacheVersion txId = txRecord.nearXidVersion(); + + switch (txRecord.state()) { + case PREPARED: + assert !activeTransactions.contains(txId) : "Transaction is already present " + txRecord; + + activeTransactions.add(txId); + + break; + case COMMITTED: + assert activeTransactions.contains(txId) : "No PREPARE marker for transaction " + txRecord; + + activeTransactions.remove(txId); + + break; + case ROLLED_BACK: + activeTransactions.remove(txId); + break; + + default: + throw new IllegalStateException("Unknown Tx state of record " + txRecord); + } + } else if (rec instanceof DataRecord) { + DataRecord dataRecord = (DataRecord) rec; + + for (DataEntry entry : dataRecord.writeEntries()) { + GridCacheVersion txId = entry.nearXidVersion(); + + assert activeTransactions.contains(txId) : "No transaction for entry " + entry; + } + } + } + } + } + finally { + System.clearProperty(IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS); + stopAllGrids(); + } + } + + /** + * Generate random lowercase string for test purposes. + */ + private String randomString(Random random) { + int len = random.nextInt(50) + 1; + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < len; i++) + sb.append(random.nextInt(26) + 'a'); + + return sb.toString(); + } + + /** + * BigObject for test purposes that don't fit in page size. + */ + private static class BigObject { + private final int index; + + private final byte[] payload = new byte[4096]; + + BigObject(int index) { + this.index = index; + // Create pseudo-random array. + for (int i = 0; i < payload.length; i++) + if (i % index == 0) + payload[i] = (byte) index; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + BigObject bigObject = (BigObject) o; + return index == bigObject.index && + Arrays.equals(payload, bigObject.payload); + } + + @Override + public int hashCode() { + return Objects.hash(index, payload); + } + } + + /** * @param size Size of data. * @return Test data. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5377af2c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java index 10e637b..ebb80a1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java @@ -105,7 +105,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { * Field for transferring setting from test to getConfig method * Archive incomplete segment after inactivity milliseconds. */ - private int archiveIncompleteSegmentAfterInactivityMs = 0; + private int archiveIncompleteSegmentAfterInactivityMs; /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { @@ -791,7 +791,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { final TxRecord txRecord = (TxRecord)walRecord; final GridCacheVersion globalTxId = txRecord.nearXidVersion(); - log.info("//Tx Record, action: " + txRecord.action() + + log.info("//Tx Record, state: " + txRecord.state() + "; nearTxVersion" + globalTxId); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5377af2c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java index f90ae37..4030e53 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java @@ -104,6 +104,7 @@ public class MockWalIteratorFactory { when(sctx.kernalContext()).thenReturn(ctx); when(sctx.discovery()).thenReturn(disco); + when(sctx.gridConfig()).thenReturn(cfg); final GridCacheDatabaseSharedManager database = Mockito.mock(GridCacheDatabaseSharedManager.class);