IGNITE-2864 Need update local store from primary and backups
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9f2bafbc Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9f2bafbc Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9f2bafbc Branch: refs/heads/ignite-3163 Commit: 9f2bafbcad81c169f33d04c8f04ee4c2fe4f1d36 Parents: 7f889ee Author: Anton Vinogradov <[email protected]> Authored: Fri May 13 17:30:43 2016 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Fri May 13 17:30:43 2016 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 3 + .../cache/GridCacheSharedContext.java | 34 ++ .../GridDistributedTxRemoteAdapter.java | 7 + .../dht/atomic/GridDhtAtomicCache.java | 17 +- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 6 +- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 109 +++-- .../store/GridCacheStoreManagerAdapter.java | 29 +- .../cache/transactions/IgniteInternalTx.java | 5 + .../cache/transactions/IgniteTxAdapter.java | 252 ++++++++++ .../IgniteTxImplicitSingleStateImpl.java | 9 +- .../transactions/IgniteTxLocalAdapter.java | 237 --------- .../cache/transactions/IgniteTxLocalEx.java | 5 - .../IgniteTxRemoteSingleStateImpl.java | 18 +- .../IgniteTxRemoteStateAdapter.java | 9 - .../transactions/IgniteTxRemoteStateImpl.java | 34 +- .../cache/transactions/IgniteTxStateImpl.java | 2 +- .../CacheStoreUsageMultinodeAbstractTest.java | 5 +- .../GridCacheAbstractLocalStoreSelfTest.java | 478 +++++++++++++++++-- .../GridCachePartitionedLocalStoreSelfTest.java | 6 - ...chePartitionedOffHeapLocalStoreSelfTest.java | 6 - .../GridCacheReplicatedLocalStoreSelfTest.java | 6 - ...ridCacheTxPartitionedLocalStoreSelfTest.java | 6 - 22 files changed, 912 insertions(+), 371 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 2b643b2..1af8b6e 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -381,6 +381,9 @@ public final class IgniteSystemProperties { /** Maximum number of nested listener calls before listener notification becomes asynchronous. */ public static final String IGNITE_MAX_NESTED_LISTENER_CALLS = "IGNITE_MAX_NESTED_LISTENER_CALLS"; + /** Indicating whether local store keeps primary only. Backward compatibility flag. */ + public static final String IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY = "IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY"; + /** * Manages {@link OptimizedMarshaller} behavior of {@code serialVersionUID} computation for * {@link Serializable} classes. http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index ef271f1..341f610 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -24,8 +24,10 @@ import java.util.ListIterator; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.store.CacheStoreSessionListener; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; @@ -53,6 +55,8 @@ import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.marshaller.Marshaller; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY; + /** * Shared context. */ @@ -94,6 +98,12 @@ public class GridCacheSharedContext<K, V> { /** Store session listeners. */ private Collection<CacheStoreSessionListener> storeSesLsnrs; + /** Local store count. */ + private final AtomicInteger locStoreCnt; + + /** Indicating whether local store keeps primary only. */ + private final boolean locStorePrimaryOnly = IgniteSystemProperties.getBoolean(IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY); + /** * @param kernalCtx Context. * @param txMgr Transaction manager. @@ -125,6 +135,8 @@ public class GridCacheSharedContext<K, V> { txMetrics = new TransactionMetricsAdapter(); ctxMap = new ConcurrentHashMap<>(); + + locStoreCnt = new AtomicInteger(); } /** @@ -242,6 +254,11 @@ public class GridCacheSharedContext<K, V> { ", conflictingCacheName=" + existing.name() + ']'); } + CacheStoreManager mgr = cacheCtx.store(); + + if (mgr.configured() && mgr.isLocal()) + locStoreCnt.incrementAndGet(); + ctxMap.put(cacheCtx.cacheId(), cacheCtx); } @@ -253,6 +270,11 @@ public class GridCacheSharedContext<K, V> { ctxMap.remove(cacheId, cacheCtx); + CacheStoreManager mgr = cacheCtx.store(); + + if (mgr.configured() && mgr.isLocal()) + locStoreCnt.decrementAndGet(); + // Safely clean up the message listeners. ioMgr.removeHandlers(cacheId); } @@ -464,6 +486,13 @@ public class GridCacheSharedContext<K, V> { } /** + * @return Count of caches with configured local stores. + */ + public int getLocalStoreCount() { + return locStoreCnt.get(); + } + + /** * @param nodeId Node ID. * @return Node or {@code null}. */ @@ -471,6 +500,11 @@ public class GridCacheSharedContext<K, V> { return kernalCtx.discovery().node(nodeId); } + /** Indicating whether local store keeps primary only. */ + public boolean localStorePrimaryOnly() { + return locStorePrimaryOnly; + } + /** * Gets grid logger for given class. * http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 1fd0b2e..51be6e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -452,6 +452,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter AffinityTopologyVersion topVer = topologyVersion(); + batchStoreCommit(writeMap().values()); + // Node that for near transactions we grab all entries. for (IgniteTxEntry txEntry : (near() ? allEntries() : writeEntries())) { GridCacheContext cacheCtx = txEntry.context(); @@ -770,6 +772,11 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter return explicitVers == null ? Collections.<GridCacheVersion>emptyList() : explicitVers; } + /** {@inheritDoc} */ + @Override public void commitError(Throwable e) { + // No-op. + } + /** * Adds explicit version if there is one. * http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 25e4e3f..6aad533 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -144,7 +144,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** Update reply closure. */ private CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos; - /** Pending */ + /** Pending */ private ConcurrentMap<UUID, DeferredResponseBuffer> pendingResponses = new ConcurrentHashMap8<>(); /** */ @@ -1414,10 +1414,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridCacheReturn retVal = null; - if (keys.size() > 1 && // Several keys ... - writeThrough() && !req.skipStore() && // and store is enabled ... - !ctx.store().isLocal() && // and this is not local store ... - !ctx.dr().receiveEnabled() // and no DR. + if (keys.size() > 1 && // Several keys ... + writeThrough() && !req.skipStore() && // and store is enabled ... + !ctx.store().isLocal() && // and this is not local store ... + // (conflict resolver should be used for local store) + !ctx.dr().receiveEnabled() // and no DR. ) { // This method can only be used when there are no replicated entries in the batch. UpdateBatchResult updRes = updateWithBatch(node, @@ -2043,7 +2044,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { op, writeVal, req.invokeArguments(), - primary && writeThrough() && !req.skipStore(), + (primary || (ctx.store().isLocal() && !ctx.shared().localStorePrimaryOnly())) + && writeThrough() && !req.skipStore(), !req.skipStore(), sndPrevVal || req.returnValue(), req.keepBinary(), @@ -2801,7 +2803,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { op, op == TRANSFORM ? entryProcessor : val, op == TRANSFORM ? req.invokeArguments() : null, - /*write-through*/false, + /*write-through*/(ctx.store().isLocal() && !ctx.shared().localStorePrimaryOnly()) + && writeThrough() && !req.skipStore(), /*read-through*/false, /*retval*/false, req.keepBinary(), http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 47f6cb2..df44455 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -262,7 +262,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> this.updateReq.taskNameHash(), forceTransformBackups ? this.updateReq.invokeArguments() : null, cctx.deploymentEnabled(), - this.updateReq.keepBinary()); + this.updateReq.keepBinary(), + this.updateReq.skipStore()); mappings.put(nodeId, updateReq); } @@ -323,7 +324,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> this.updateReq.taskNameHash(), forceTransformBackups ? this.updateReq.invokeArguments() : null, cctx.deploymentEnabled(), - this.updateReq.keepBinary()); + this.updateReq.keepBinary(), + this.updateReq.skipStore()); mappings.put(nodeId, updateReq); } http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 126cd83..ed50f41 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -46,6 +46,8 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_STORE_FLAG_MASK; + /** * Lite dht cache backup update request. */ @@ -57,6 +59,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid public static final int CACHE_MSG_IDX = nextIndexId(); /** Node ID. */ + @GridDirectTransient private UUID nodeId; /** Future version. */ @@ -159,6 +162,11 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid private boolean keepBinary; /** + * Additional flags. + */ + private byte flags; + + /** * Empty constructor required by {@link Externalizable}. */ public GridDhtAtomicUpdateRequest() { @@ -192,7 +200,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid int taskNameHash, Object[] invokeArgs, boolean addDepInfo, - boolean keepBinary + boolean keepBinary, + boolean skipStore ) { assert invokeArgs == null || forceTransformBackups; @@ -209,6 +218,9 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid this.addDepInfo = addDepInfo; this.keepBinary = keepBinary; + if (skipStore) + flags = (byte)(flags | SKIP_STORE_FLAG_MASK); + keys = new ArrayList<>(); partIds = new ArrayList<>(); @@ -465,6 +477,13 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid } /** + * @return Skip write-through to a persistent storage. + */ + public boolean skipStore() { + return (flags & SKIP_STORE_FLAG_MASK) == SKIP_STORE_FLAG_MASK; + } + + /** * @param updCntr Update counter. * @return Update counter. */ @@ -717,114 +736,120 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid writer.incrementState(); case 6: - if (!writer.writeBoolean("forceTransformBackups", forceTransformBackups)) + if (!writer.writeByte("flags", flags)) return false; writer.incrementState(); case 7: - if (!writer.writeMessage("futVer", futVer)) + if (!writer.writeBoolean("forceTransformBackups", forceTransformBackups)) return false; writer.incrementState(); case 8: - if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR)) + if (!writer.writeMessage("futVer", futVer)) return false; writer.incrementState(); case 9: - if (!writer.writeBoolean("keepBinary", keepBinary)) + if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR)) return false; writer.incrementState(); case 10: - if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) + if (!writer.writeBoolean("keepBinary", keepBinary)) return false; writer.incrementState(); case 11: - if (!writer.writeCollection("nearEntryProcessorsBytes", nearEntryProcessorsBytes, MessageCollectionItemType.BYTE_ARR)) + if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 12: - if (!writer.writeMessage("nearExpireTimes", nearExpireTimes)) + if (!writer.writeCollection("nearEntryProcessorsBytes", nearEntryProcessorsBytes, MessageCollectionItemType.BYTE_ARR)) return false; writer.incrementState(); case 13: - if (!writer.writeCollection("nearKeys", nearKeys, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("nearExpireTimes", nearExpireTimes)) return false; writer.incrementState(); case 14: - if (!writer.writeMessage("nearTtls", nearTtls)) + if (!writer.writeCollection("nearKeys", nearKeys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 15: - if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("nearTtls", nearTtls)) return false; writer.incrementState(); case 16: - if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG)) + if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 17: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 18: - if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 19: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) return false; writer.incrementState(); case 20: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); case 21: - if (!writer.writeMessage("ttls", ttls)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); case 22: - if (!writer.writeMessage("updateCntrs", updateCntrs)) + if (!writer.writeMessage("ttls", ttls)) return false; writer.incrementState(); case 23: - if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("updateCntrs", updateCntrs)) return false; writer.incrementState(); case 24: + if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 25: if (!writer.writeMessage("writeVer", writeVer)) return false; @@ -871,7 +896,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 6: - forceTransformBackups = reader.readBoolean("forceTransformBackups"); + flags = reader.readByte("flags"); if (!reader.isLastRead()) return false; @@ -879,7 +904,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 7: - futVer = reader.readMessage("futVer"); + forceTransformBackups = reader.readBoolean("forceTransformBackups"); if (!reader.isLastRead()) return false; @@ -887,7 +912,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 8: - invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class); + futVer = reader.readMessage("futVer"); if (!reader.isLastRead()) return false; @@ -895,7 +920,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 9: - keepBinary = reader.readBoolean("keepBinary"); + invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class); if (!reader.isLastRead()) return false; @@ -903,7 +928,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 10: - keys = reader.readCollection("keys", MessageCollectionItemType.MSG); + keepBinary = reader.readBoolean("keepBinary"); if (!reader.isLastRead()) return false; @@ -911,7 +936,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 11: - nearEntryProcessorsBytes = reader.readCollection("nearEntryProcessorsBytes", MessageCollectionItemType.BYTE_ARR); + keys = reader.readCollection("keys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -919,7 +944,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 12: - nearExpireTimes = reader.readMessage("nearExpireTimes"); + nearEntryProcessorsBytes = reader.readCollection("nearEntryProcessorsBytes", MessageCollectionItemType.BYTE_ARR); if (!reader.isLastRead()) return false; @@ -927,7 +952,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 13: - nearKeys = reader.readCollection("nearKeys", MessageCollectionItemType.MSG); + nearExpireTimes = reader.readMessage("nearExpireTimes"); if (!reader.isLastRead()) return false; @@ -935,7 +960,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 14: - nearTtls = reader.readMessage("nearTtls"); + nearKeys = reader.readCollection("nearKeys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -943,7 +968,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 15: - nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG); + nearTtls = reader.readMessage("nearTtls"); if (!reader.isLastRead()) return false; @@ -951,7 +976,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 16: - prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG); + nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -959,7 +984,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 17: - subjId = reader.readUuid("subjId"); + prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -967,6 +992,14 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 18: + subjId = reader.readUuid("subjId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 19: byte syncModeOrd; syncModeOrd = reader.readByte("syncMode"); @@ -978,7 +1011,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 19: + case 20: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -986,7 +1019,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 20: + case 21: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -994,7 +1027,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 21: + case 22: ttls = reader.readMessage("ttls"); if (!reader.isLastRead()) @@ -1002,7 +1035,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 22: + case 23: updateCntrs = reader.readMessage("updateCntrs"); if (!reader.isLastRead()) @@ -1010,7 +1043,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 23: + case 24: vals = reader.readCollection("vals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -1018,7 +1051,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 24: + case 25: writeVer = reader.readMessage("writeVer"); if (!reader.isLastRead()) @@ -1051,7 +1084,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 25; + return 26; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index 7ffebbd..18d9c7b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -31,9 +31,11 @@ import javax.cache.Cache; import javax.cache.integration.CacheLoaderException; import javax.cache.integration.CacheWriterException; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.cache.store.CacheStoreSession; import org.apache.ignite.cache.store.CacheStoreSessionListener; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.cache.CacheEntryImpl; @@ -60,12 +62,14 @@ import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lifecycle.LifecycleAware; import org.apache.ignite.transactions.Transaction; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY; + /** * Store manager. */ @@ -74,6 +78,13 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt /** */ private static final int SES_ATTR = GridMetadataAwareAdapter.EntryKey.CACHE_STORE_MANAGER_KEY.key(); + /** + * Behavior can be changed by setting {@link IgniteSystemProperties#IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY} property + * to {@code True}. + */ + private static final IgniteProductVersion LOCAL_STORE_KEEPS_PRIMARY_AND_BACKUPS_SINCE = + IgniteProductVersion.fromString("1.5.22"); + /** */ protected CacheStore<Object, Object> store; @@ -217,6 +228,22 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt globalSesLsnrs = true; } + + if (isLocal()) { + for (ClusterNode node : cctx.kernalContext().cluster().get().forRemotes().nodes()) { + if (LOCAL_STORE_KEEPS_PRIMARY_AND_BACKUPS_SINCE.compareTo(node.version()) > 0 && + !IgniteSystemProperties.getBoolean(IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY)) { + IgniteProductVersion v = LOCAL_STORE_KEEPS_PRIMARY_AND_BACKUPS_SINCE; + + log.warning("Since Ignite " + v.major() + "." + v.minor() + "." + v.maintenance() + + " Local Store keeps primary and backup partitions. " + + "To keep primary partitions only please set system property " + + IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY + " to 'true'."); + + break; + } + } + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index f5f99f5..07b21ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -719,4 +719,9 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject { * @param topVer New topology version. */ public void onRemap(AffinityTopologyVersion topVer); + + /** + * @param e Commit error. + */ + public void commitError(Throwable e); } http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 3286689..66d4df0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -22,10 +22,13 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.io.ObjectStreamException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -40,6 +43,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheInvokeEntry; +import org.apache.ignite.internal.processors.cache.CacheLazyEntry; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; @@ -49,6 +53,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; +import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; import org.apache.ignite.internal.processors.cache.version.GridCachePlainVersionedEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; @@ -1211,6 +1216,248 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter } /** + * @param stores Store managers. + * @return If {@code isWriteToStoreFromDht} value same for all stores. + */ + protected boolean isWriteToStoreFromDhtValid(Collection<CacheStoreManager> stores) { + if (stores != null && !stores.isEmpty()) { + boolean exp = F.first(stores).isWriteToStoreFromDht(); + + for (CacheStoreManager store : stores) { + if (store.isWriteToStoreFromDht() != exp) + return false; + } + } + + return true; + } + + /** + * @param stores Store managers. + * @param commit Commit flag. + * @throws IgniteCheckedException In case of error. + */ + protected void sessionEnd(Collection<CacheStoreManager> stores, boolean commit) throws IgniteCheckedException { + Iterator<CacheStoreManager> it = stores.iterator(); + + while (it.hasNext()) { + CacheStoreManager store = it.next(); + + store.sessionEnd(this, commit, !it.hasNext()); + } + } + + /** + * Performs batch database operations. This commit must be called + * before cache update. This way if there is a DB failure, + * cache transaction can still be rolled back. + * + * @param writeEntries Transaction write set. + * @throws IgniteCheckedException If batch update failed. + */ + @SuppressWarnings({"CatchGenericClass"}) + protected void batchStoreCommit(Iterable<IgniteTxEntry> writeEntries) throws IgniteCheckedException { + if (!storeEnabled() || internal() || + (!local() && near())) // No need to work with local store at GridNearTxRemote. + return; + + Collection<CacheStoreManager> stores = txState().stores(cctx); + + if (stores == null || stores.isEmpty()) + return; + + assert isWriteToStoreFromDhtValid(stores) : "isWriteToStoreFromDht can't be different within one transaction"; + + CacheStoreManager first = F.first(stores); + + boolean isWriteToStoreFromDht = first.isWriteToStoreFromDht(); + + if ((local() || first.isLocal()) && (near() || isWriteToStoreFromDht)) { + try { + if (writeEntries != null) { + Map<Object, IgniteBiTuple<Object, GridCacheVersion>> putMap = null; + List<Object> rmvCol = null; + CacheStoreManager writeStore = null; + + boolean skipNonPrimary = near() && isWriteToStoreFromDht; + + for (IgniteTxEntry e : writeEntries) { + boolean skip = e.skipStore(); + + if (!skip && skipNonPrimary) { + skip = e.cached().isNear() || + e.cached().detached() || + !e.context().affinity().primary(e.cached().partition(), topologyVersion()).isLocal(); + } + + if (!skip && !local() && // Update local store at backups only if needed. + cctx.localStorePrimaryOnly()) + skip = true; + + if (skip) + continue; + + boolean intercept = e.context().config().getInterceptor() != null; + + if (intercept || !F.isEmpty(e.entryProcessors())) + e.cached().unswap(false); + + IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(e, false); + + GridCacheContext cacheCtx = e.context(); + + GridCacheOperation op = res.get1(); + KeyCacheObject key = e.key(); + CacheObject val = res.get2(); + GridCacheVersion ver = writeVersion(); + + if (op == CREATE || op == UPDATE) { + // Batch-process all removes if needed. + if (rmvCol != null && !rmvCol.isEmpty()) { + assert writeStore != null; + + writeStore.removeAll(this, rmvCol); + + // Reset. + rmvCol.clear(); + + writeStore = null; + } + + // Batch-process puts if cache ID has changed. + if (writeStore != null && writeStore != cacheCtx.store()) { + if (putMap != null && !putMap.isEmpty()) { + writeStore.putAll(this, putMap); + + // Reset. + putMap.clear(); + } + + writeStore = null; + } + + if (intercept) { + Object interceptorVal = cacheCtx.config().getInterceptor().onBeforePut( + new CacheLazyEntry( + cacheCtx, + key, + e.cached().rawGetOrUnmarshal(true), + e.keepBinary()), + cacheCtx.cacheObjectContext().unwrapBinaryIfNeeded(val, e.keepBinary(), false)); + + if (interceptorVal == null) + continue; + + val = cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(interceptorVal)); + } + + if (writeStore == null) + writeStore = cacheCtx.store(); + + if (writeStore.isWriteThrough()) { + if (putMap == null) + putMap = new LinkedHashMap<>(writeMap().size(), 1.0f); + + putMap.put(key, F.<Object, GridCacheVersion>t(val, ver)); + } + } + else if (op == DELETE) { + // Batch-process all puts if needed. + if (putMap != null && !putMap.isEmpty()) { + assert writeStore != null; + + writeStore.putAll(this, putMap); + + // Reset. + putMap.clear(); + + writeStore = null; + } + + if (writeStore != null && writeStore != cacheCtx.store()) { + if (rmvCol != null && !rmvCol.isEmpty()) { + writeStore.removeAll(this, rmvCol); + + // Reset. + rmvCol.clear(); + } + + writeStore = null; + } + + if (intercept) { + IgniteBiTuple<Boolean, Object> t = cacheCtx.config().getInterceptor().onBeforeRemove( + new CacheLazyEntry(cacheCtx, key, e.cached().rawGetOrUnmarshal(true), e.keepBinary())); + + if (cacheCtx.cancelRemove(t)) + continue; + } + + if (writeStore == null) + writeStore = cacheCtx.store(); + + if (writeStore.isWriteThrough()) { + if (rmvCol == null) + rmvCol = new ArrayList<>(); + + rmvCol.add(key); + } + } + else if (log.isDebugEnabled()) + log.debug("Ignoring NOOP entry for batch store commit: " + e); + } + + if (putMap != null && !putMap.isEmpty()) { + assert rmvCol == null || rmvCol.isEmpty(); + assert writeStore != null; + + // Batch put at the end of transaction. + writeStore.putAll(this, putMap); + } + + if (rmvCol != null && !rmvCol.isEmpty()) { + assert putMap == null || putMap.isEmpty(); + assert writeStore != null; + + // Batch remove at the end of transaction. + writeStore.removeAll(this, rmvCol); + } + } + + // Commit while locks are held. + sessionEnd(stores, true); + } + catch (IgniteCheckedException ex) { + commitError(ex); + + setRollbackOnly(); + + // Safe to remove transaction from committed tx list because nothing was committed yet. + cctx.tm().removeCommittedTx(this); + + throw ex; + } + catch (Throwable ex) { + commitError(ex); + + setRollbackOnly(); + + // Safe to remove transaction from committed tx list because nothing was committed yet. + cctx.tm().removeCommittedTx(this); + + if (ex instanceof Error) + throw (Error)ex; + + throw new IgniteCheckedException("Failed to commit transaction to database: " + this, ex); + } + finally { + if (isRollbackOnly()) + sessionEnd(stores, false); + } + } + } + + /** * @param txEntry Entry to process. * @param metrics {@code True} if metrics should be updated. * @return Tuple containing transformation results. @@ -1788,6 +2035,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter } /** {@inheritDoc} */ + @Override public void commitError(Throwable e) { + throw new IllegalStateException("Deserialized transaction can only be used as read-only."); + } + + /** {@inheritDoc} */ @Override public boolean empty() { throw new IllegalStateException("Deserialized transaction can only be used as read-only."); } http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java index 3e0231e..66069a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java @@ -163,13 +163,8 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter { CacheStoreManager store = cacheCtx.store(); - if (store.configured()) { - HashSet<CacheStoreManager> set = new HashSet<>(3, 0.75f); - - set.add(store); - - return set; - } + if (store.configured()) + return Collections.singleton(store); return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 962f2d5..312d3a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -25,7 +25,6 @@ import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -42,7 +41,6 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheInvokeEntry; -import org.apache.ignite.internal.processors.cache.CacheLazyEntry; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheOperationContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -629,209 +627,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter return cacheCtx.cache().entryEx(key.key(), topVer); } - /** - * Performs batch database operations. This commit must be called - * before {@link #userCommit()}. This way if there is a DB failure, - * cache transaction can still be rolled back. - * - * @param writeEntries Transaction write set. - * @throws IgniteCheckedException If batch update failed. - */ - @SuppressWarnings({"CatchGenericClass"}) - protected void batchStoreCommit(Iterable<IgniteTxEntry> writeEntries) throws IgniteCheckedException { - if (!storeEnabled() || internal()) - return; - - Collection<CacheStoreManager> stores = txState.stores(cctx); - - if (stores == null || stores.isEmpty()) - return; - - assert isWriteToStoreFromDhtValid(stores) : "isWriteToStoreFromDht can't be different within one transaction"; - - boolean isWriteToStoreFromDht = F.first(stores).isWriteToStoreFromDht(); - - if (near() || isWriteToStoreFromDht) { - try { - if (writeEntries != null) { - Map<Object, IgniteBiTuple<Object, GridCacheVersion>> putMap = null; - List<Object> rmvCol = null; - CacheStoreManager writeStore = null; - - boolean skipNonPrimary = near() && isWriteToStoreFromDht; - - for (IgniteTxEntry e : writeEntries) { - boolean skip = e.skipStore(); - - if (!skip && skipNonPrimary) { - skip = e.cached().isNear() || - e.cached().detached() || - !e.context().affinity().primary(e.cached().partition(), topologyVersion()).isLocal(); - } - - if (skip) - continue; - - boolean intercept = e.context().config().getInterceptor() != null; - - if (intercept || !F.isEmpty(e.entryProcessors())) - e.cached().unswap(false); - - IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(e, false); - - GridCacheContext cacheCtx = e.context(); - - GridCacheOperation op = res.get1(); - KeyCacheObject key = e.key(); - CacheObject val = res.get2(); - GridCacheVersion ver = writeVersion(); - - if (op == CREATE || op == UPDATE) { - // Batch-process all removes if needed. - if (rmvCol != null && !rmvCol.isEmpty()) { - assert writeStore != null; - - writeStore.removeAll(this, rmvCol); - - // Reset. - rmvCol.clear(); - - writeStore = null; - } - - // Batch-process puts if cache ID has changed. - if (writeStore != null && writeStore != cacheCtx.store()) { - if (putMap != null && !putMap.isEmpty()) { - writeStore.putAll(this, putMap); - - // Reset. - putMap.clear(); - } - - writeStore = null; - } - - if (intercept) { - Object interceptorVal = cacheCtx.config().getInterceptor().onBeforePut( - new CacheLazyEntry( - cacheCtx, - key, - e.cached().rawGetOrUnmarshal(true), - e.keepBinary()), - cacheCtx.cacheObjectContext().unwrapBinaryIfNeeded(val, e.keepBinary(), false)); - - if (interceptorVal == null) - continue; - - val = cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(interceptorVal)); - } - - if (writeStore == null) - writeStore = cacheCtx.store(); - - if (writeStore.isWriteThrough()) { - if (putMap == null) - putMap = new LinkedHashMap<>(writeMap().size(), 1.0f); - - putMap.put(key, F.<Object, GridCacheVersion>t(val, ver)); - } - } - else if (op == DELETE) { - // Batch-process all puts if needed. - if (putMap != null && !putMap.isEmpty()) { - assert writeStore != null; - - writeStore.putAll(this, putMap); - - // Reset. - putMap.clear(); - - writeStore = null; - } - - if (writeStore != null && writeStore != cacheCtx.store()) { - if (rmvCol != null && !rmvCol.isEmpty()) { - writeStore.removeAll(this, rmvCol); - - // Reset. - rmvCol.clear(); - } - - writeStore = null; - } - - if (intercept) { - IgniteBiTuple<Boolean, Object> t = cacheCtx.config().getInterceptor().onBeforeRemove( - new CacheLazyEntry(cacheCtx, key, e.cached().rawGetOrUnmarshal(true), e.keepBinary())); - - if (cacheCtx.cancelRemove(t)) - continue; - } - - if (writeStore == null) - writeStore = cacheCtx.store(); - - if (writeStore.isWriteThrough()) { - if (rmvCol == null) - rmvCol = new ArrayList<>(); - - rmvCol.add(key); - } - } - else if (log.isDebugEnabled()) - log.debug("Ignoring NOOP entry for batch store commit: " + e); - } - - if (putMap != null && !putMap.isEmpty()) { - assert rmvCol == null || rmvCol.isEmpty(); - assert writeStore != null; - - // Batch put at the end of transaction. - writeStore.putAll(this, putMap); - } - - if (rmvCol != null && !rmvCol.isEmpty()) { - assert putMap == null || putMap.isEmpty(); - assert writeStore != null; - - // Batch remove at the end of transaction. - writeStore.removeAll(this, rmvCol); - } - } - - // Commit while locks are held. - sessionEnd(stores, true); - } - catch (IgniteCheckedException ex) { - commitError(ex); - - setRollbackOnly(); - - // Safe to remove transaction from committed tx list because nothing was committed yet. - cctx.tm().removeCommittedTx(this); - - throw ex; - } - catch (Throwable ex) { - commitError(ex); - - setRollbackOnly(); - - // Safe to remove transaction from committed tx list because nothing was committed yet. - cctx.tm().removeCommittedTx(this); - - if (ex instanceof Error) - throw (Error)ex; - - throw new IgniteCheckedException("Failed to commit transaction to database: " + this, ex); - } - finally { - if (isRollbackOnly()) - sessionEnd(stores, false); - } - } - } - /** {@inheritDoc} */ @SuppressWarnings({"CatchGenericClass"}) @Override public void userCommit() throws IgniteCheckedException { @@ -1314,21 +1109,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } /** - * @param stores Store managers. - * @param commit Commit flag. - * @throws IgniteCheckedException In case of error. - */ - private void sessionEnd(Collection<CacheStoreManager> stores, boolean commit) throws IgniteCheckedException { - Iterator<CacheStoreManager> it = stores.iterator(); - - while (it.hasNext()) { - CacheStoreManager store = it.next(); - - store.sessionEnd(this, commit, !it.hasNext()); - } - } - - /** * @param entry Entry. * @return {@code True} if local node is current primary for given entry. */ @@ -3989,23 +3769,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } /** - * @param stores Store managers. - * @return If {@code isWriteToStoreFromDht} value same for all stores. - */ - private boolean isWriteToStoreFromDhtValid(Collection<CacheStoreManager> stores) { - if (stores != null && !stores.isEmpty()) { - boolean exp = F.first(stores).isWriteToStoreFromDht(); - - for (CacheStoreManager store : stores) { - if (store.isWriteToStoreFromDht() != exp) - return false; - } - } - - return true; - } - - /** * Post-lock closure alias. * * @param <T> Return type. http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java index 78f517c..d4350d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java @@ -47,11 +47,6 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { @Nullable public Throwable commitError(); /** - * @param e Commit error. - */ - public void commitError(Throwable e); - - /** * @throws IgniteCheckedException If commit failed. */ public void userCommit() throws IgniteCheckedException; http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java index a68006b..eb39edd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java @@ -20,9 +20,10 @@ package org.apache.ignite.internal.processors.cache.transactions; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; @@ -114,4 +115,19 @@ public class IgniteTxRemoteSingleStateImpl extends IgniteTxRemoteStateAdapter { public String toString() { return S.toString(IgniteTxRemoteSingleStateImpl.class, this); } + + /** {@inheritDoc} */ + @Override public Collection<CacheStoreManager> stores(GridCacheSharedContext cctx) { + if (entry == null) + return null; + + CacheStoreManager store = entry.context().store(); + + if (store.configured() + && store.isLocal()) { // Only local stores take part at tx on backup node. + return Collections.singleton(store); + } + + return null; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java index 3e5034b..eaeaccc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java @@ -17,12 +17,10 @@ package org.apache.ignite.internal.processors.cache.transactions; -import java.util.Collection; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; -import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.jetbrains.annotations.Nullable; @@ -97,13 +95,6 @@ public abstract class IgniteTxRemoteStateAdapter implements IgniteTxRemoteState } /** {@inheritDoc} */ - @Override public Collection<CacheStoreManager> stores(GridCacheSharedContext cctx) { - assert false; - - return null; - } - - /** {@inheritDoc} */ @Nullable @Override public GridCacheContext singleCacheContext(GridCacheSharedContext cctx) { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java index 3335b44..4f46831 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java @@ -17,13 +17,15 @@ package org.apache.ignite.internal.processors.cache.transactions; +import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.Map; import java.util.Set; - import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; @@ -145,4 +147,34 @@ public class IgniteTxRemoteStateImpl extends IgniteTxRemoteStateAdapter { public String toString() { return S.toString(IgniteTxRemoteStateImpl.class, this); } + + /** {@inheritDoc} */ + @Override public Collection<CacheStoreManager> stores(GridCacheSharedContext cctx) { + int locStoreCnt = cctx.getLocalStoreCount(); + + if (locStoreCnt > 0 && !writeMap.isEmpty()) { + Collection<CacheStoreManager> stores = null; + + for (IgniteTxEntry e : writeMap.values()) { + if (e.skipStore()) + continue; + + CacheStoreManager store = e.context().store(); + + if (store.configured() && store.isLocal()) { + if (stores == null) + stores = new ArrayList<>(locStoreCnt); + + stores.add(store); + + if (stores.size() == locStoreCnt) + break; + } + } + + return stores; + } + + return null; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java index 1256aa6..f08377b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java @@ -47,8 +47,8 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter { /** Active cache IDs. */ private GridLongList activeCacheIds = new GridLongList(); - /** Per-transaction read map. */ + /** Per-transaction read map. */ @GridToStringInclude protected Map<IgniteTxKey, IgniteTxEntry> txMap; http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java index 9c4b7b2..eaae2f9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java @@ -242,9 +242,10 @@ public abstract class CacheStoreUsageMultinodeAbstractTest extends GridCommonAbs assertTrue("Store is not updated", wait); - assertEquals("Write on wrong node: " + writeMap, 1, writeMap.size()); + assertEquals("Write on wrong node: " + writeMap, locStore ? 2 : 1, writeMap.size()); - assertEquals(expNode, writeMap.keySet().iterator().next()); + if (!locStore) + assertEquals(expNode, writeMap.keySet().iterator().next()); writeMap.clear(); }
