This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new ab4d60dc6b2 IGNITE-21151 MVCC caching removal (#11140)
ab4d60dc6b2 is described below
commit ab4d60dc6b2ff331c8579f60564fe2f6eef58851
Author: Ilya Shishkov <[email protected]>
AuthorDate: Wed Jan 17 19:41:26 2024 +0300
IGNITE-21151 MVCC caching removal (#11140)
---
.../org/apache/ignite/IgniteSystemProperties.java | 9 -
.../processors/cache/GridCacheProcessor.java | 2 -
.../processors/cache/GridCacheSharedContext.java | 28 --
.../GridDistributedTxRemoteAdapter.java | 4 -
.../dht/GridDhtTxAbstractEnlistFuture.java | 9 +-
.../processors/cache/mvcc/MvccCachingManager.java | 434 ---------------------
.../processors/cache/mvcc/MvccTxEntry.java | 203 ----------
.../cache/transactions/IgniteTxHandler.java | 10 +-
.../IgniteTxImplicitSingleStateImpl.java | 12 -
.../cache/transactions/IgniteTxLocalAdapter.java | 6 -
.../transactions/IgniteTxRemoteStateAdapter.java | 18 -
.../cache/transactions/IgniteTxState.java | 8 -
.../cache/transactions/IgniteTxStateImpl.java | 15 -
13 files changed, 4 insertions(+), 754 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 4a17a95c2c4..f13f3288a2d 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -90,7 +90,6 @@ import static
org.apache.ignite.internal.processors.cache.distributed.dht.preloa
import static
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.DFLT_PRELOAD_RESEND_TIMEOUT;
import static
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition.DFLT_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
import static
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition.DFLT_CACHE_REMOVE_ENTRIES_TTL;
-import static
org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager.DFLT_MVCC_TX_SIZE_CACHING_THRESHOLD;
import static
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DFLT_DEFRAGMENTATION_REGION_SIZE_PERCENTAGE;
import static
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DFLT_PDS_WAL_REBALANCE_THRESHOLD;
import static
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointHistory.DFLT_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE;
@@ -1577,14 +1576,6 @@ public final class IgniteSystemProperties {
defaults = "10")
public static final String IGNITE_ZOOKEEPER_DISCOVERY_MAX_RETRY_COUNT =
"IGNITE_ZOOKEEPER_DISCOVERY_MAX_RETRY_COUNT";
- /**
- * Maximum number for cached MVCC transaction updates. This caching is
used for continuous query with MVCC caches.
- */
- @SystemProperty(value = "Maximum number for cached MVCC transaction
updates. This caching is used " +
- "for continuous query with MVCC caches", type = Integer.class,
- defaults = "" + DFLT_MVCC_TX_SIZE_CACHING_THRESHOLD)
- public static final String IGNITE_MVCC_TX_SIZE_CACHING_THRESHOLD =
"IGNITE_MVCC_TX_SIZE_CACHING_THRESHOLD";
-
/**
* Try reuse memory on deactivation. Useful in case of huge page memory
region size.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 39aeaea279f..4cae900bef5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -110,7 +110,6 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.Part
import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTransactionalCache;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrManager;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager;
import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import
org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener;
import
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
@@ -3104,7 +3103,6 @@ public class GridCacheProcessor extends
GridProcessorAdapter {
.setTtlCleanupManager(new GridCacheSharedTtlCleanupManager())
.setPartitionsEvictManager(new PartitionsEvictManager())
.setJtaManager(JTA.createOptional())
- .setMvccCachingManager(new MvccCachingManager())
.setDiagnosticManager(new CacheDiagnosticManager())
.setCdcManager(cdcMgr)
.build(kernalCtx, storeSesLsnrs);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index dc5dd65179e..fb334de35af 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -49,7 +49,6 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionsEvictManager;
import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager;
import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
@@ -139,9 +138,6 @@ public class GridCacheSharedContext<K, V> {
/** Partitons evict manager. */
private PartitionsEvictManager evictMgr;
- /** Mvcc caching manager. */
- private MvccCachingManager mvccCachingMgr;
-
/** Cache contexts map. */
private final ConcurrentHashMap<Integer, GridCacheContext<K, V>> ctxMap;
@@ -212,7 +208,6 @@ public class GridCacheSharedContext<K, V> {
* @param evictMgr Partitons evict manager.
* @param jtaMgr JTA manager.
* @param storeSesLsnrs Store session listeners.
- * @param mvccCachingMgr Mvcc caching manager.
*/
private GridCacheSharedContext(
GridKernalContext kernalCtx,
@@ -232,7 +227,6 @@ public class GridCacheSharedContext<K, V> {
PartitionsEvictManager evictMgr,
CacheJtaManagerAdapter jtaMgr,
Collection<CacheStoreSessionListener> storeSesLsnrs,
- MvccCachingManager mvccCachingMgr,
CacheDiagnosticManager diagnosticMgr,
CdcManager cdcMgr
) {
@@ -256,7 +250,6 @@ public class GridCacheSharedContext<K, V> {
ioMgr,
ttlMgr,
evictMgr,
- mvccCachingMgr,
diagnosticMgr,
cdcMgr
);
@@ -435,7 +428,6 @@ public class GridCacheSharedContext<K, V> {
ioMgr,
ttlMgr,
evictMgr,
- mvccCachingMgr,
diagnosticMgr,
cdcMgr
);
@@ -485,7 +477,6 @@ public class GridCacheSharedContext<K, V> {
GridCacheIoManager ioMgr,
GridCacheSharedTtlCleanupManager ttlMgr,
PartitionsEvictManager evictMgr,
- MvccCachingManager mvccCachingMgr,
CacheDiagnosticManager diagnosticMgr,
CdcManager cdcMgr
) {
@@ -510,7 +501,6 @@ public class GridCacheSharedContext<K, V> {
this.ioMgr = add(mgrs, ioMgr);
this.ttlMgr = add(mgrs, ttlMgr);
this.evictMgr = add(mgrs, evictMgr);
- this.mvccCachingMgr = add(mgrs, mvccCachingMgr);
}
/**
@@ -866,13 +856,6 @@ public class GridCacheSharedContext<K, V> {
return evictMgr;
}
- /**
- * @return Mvcc transaction enlist caching manager.
- */
- public MvccCachingManager mvccCaching() {
- return mvccCachingMgr;
- }
-
/**
* @return Diagnostic manager.
*/
@@ -1268,9 +1251,6 @@ public class GridCacheSharedContext<K, V> {
/** */
private PartitionsEvictManager evictMgr;
- /** */
- private MvccCachingManager mvccCachingMgr;
-
/** */
private CacheDiagnosticManager diagnosticMgr;
@@ -1305,7 +1285,6 @@ public class GridCacheSharedContext<K, V> {
evictMgr,
jtaMgr,
storeSesLsnrs,
- mvccCachingMgr,
diagnosticMgr,
cdcMgr
);
@@ -1416,13 +1395,6 @@ public class GridCacheSharedContext<K, V> {
return this;
}
- /** */
- public Builder setMvccCachingManager(MvccCachingManager
mvccCachingMgr) {
- this.mvccCachingMgr = mvccCachingMgr;
-
- return this;
- }
-
/** */
public Builder setDiagnosticManager(CacheDiagnosticManager
diagnosticMgr) {
this.diagnosticMgr = diagnosticMgr;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index afc98e047cd..8599b7e4f7c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -778,8 +778,6 @@ public abstract class GridDistributedTxRemoteAdapter
extends IgniteTxAdapter imp
if (txCntrs != null)
cctx.tm().txHandler().applyPartitionsUpdatesCounters(txCntrs.updateCounters());
- cctx.mvccCaching().onTxFinished(this, true);
-
if (!near() && !F.isEmpty(dataEntries) &&
cctx.wal(true) != null)
ptr = cctx.wal(true).log(new
DataRecord(dataEntries));
@@ -925,8 +923,6 @@ public abstract class GridDistributedTxRemoteAdapter
extends IgniteTxAdapter imp
cctx.tm().txHandler().applyPartitionsUpdatesCounters(counters.updateCounters(),
true, false);
state(ROLLED_BACK);
-
- cctx.mvccCaching().onTxFinished(this, false);
}
}
catch (IgniteCheckedException | RuntimeException | Error e) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
index 4d8444c706a..401d966df62 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
@@ -417,8 +417,6 @@ public abstract class GridDhtTxAbstractEnlistFuture<T>
extends GridCacheFutureAd
assert entryProc != null || !op.isInvoke();
- boolean needOldVal =
tx.txState().useMvccCaching(cctx.cacheId());
-
GridCacheUpdateTxResult res;
while (true) {
@@ -433,7 +431,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T>
extends GridCacheFutureAd
topVer,
mvccSnapshot,
isMoving(key.partition(), backups),
- needOldVal,
+ false,
filter,
needResult());
@@ -455,7 +453,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T>
extends GridCacheFutureAd
op.cacheOperation(),
isMoving(key.partition(), backups),
op.noCreate(),
- needOldVal,
+ false,
filter,
needResult(),
keepBinary);
@@ -632,9 +630,6 @@ public abstract class GridDhtTxAbstractEnlistFuture<T>
extends GridCacheFutureAd
|| op == EnlistOperation.LOCK)
return;
- cctx.shared().mvccCaching().addEnlisted(entry.key(),
updRes.newValue(), 0, 0, lockVer,
- updRes.oldValue(), tx.local(), tx.topologyVersion(), mvccSnapshot,
cctx.cacheId(), tx, null, -1);
-
addToBatch(entry.key(), val, updRes.mvccHistory(),
entry.context().cacheId(), backups);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachingManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachingManager.java
deleted file mode 100644
index de1da54c87c..00000000000
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCachingManager.java
+++ /dev/null
@@ -1,434 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import
org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
-import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxKey;
-import
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
-import
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager;
-import
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.processors.cache.transactions.TxCounters;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridIntList;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteUuid;
-import org.jetbrains.annotations.Nullable;
-
-import static
org.apache.ignite.IgniteSystemProperties.IGNITE_MVCC_TX_SIZE_CACHING_THRESHOLD;
-import static org.apache.ignite.internal.processors.dr.GridDrType.DR_BACKUP;
-import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRIMARY;
-
-/**
- * Manager for caching MVCC transaction updates. This updates can be used
further in CQ, DR and other places.
- */
-public class MvccCachingManager extends GridCacheSharedManagerAdapter {
- /** @see IgniteSystemProperties#IGNITE_MVCC_TX_SIZE_CACHING_THRESHOLD */
- public static final int DFLT_MVCC_TX_SIZE_CACHING_THRESHOLD = 20_000;
-
- /** Maximum possible transaction size when caching is enabled. */
- public static final int TX_SIZE_THRESHOLD =
IgniteSystemProperties.getInteger(IGNITE_MVCC_TX_SIZE_CACHING_THRESHOLD,
- DFLT_MVCC_TX_SIZE_CACHING_THRESHOLD);
-
- /** Cached enlist values. */
- private final Map<GridCacheVersion, EnlistBuffer> enlistCache = new
ConcurrentHashMap<>();
-
- /** Counters map. Used for OOM prevention caused by the big transactions.
*/
- private final Map<TxKey, AtomicInteger> cntrs = new ConcurrentHashMap<>();
-
- /**
- * Adds enlisted tx entry to cache.
- *
- * @param key Key.
- * @param val Value.
- * @param ttl Time to live.
- * @param expireTime Expire time.
- * @param ver Version.
- * @param oldVal Old value.
- * @param primary Flag whether this is a primary node.
- * @param topVer Topology version.
- * @param mvccVer Mvcc version.
- * @param cacheId Cache id.
- * @param tx Transaction.
- * @param futId Dht future id.
- * @param batchNum Batch number (for batches reordering prevention).
- * @throws IgniteCheckedException If failed.
- */
- public void addEnlisted(KeyCacheObject key,
- @Nullable CacheObject val,
- long ttl,
- long expireTime,
- GridCacheVersion ver,
- CacheObject oldVal,
- boolean primary,
- AffinityTopologyVersion topVer,
- MvccVersion mvccVer,
- int cacheId,
- IgniteInternalTx tx,
- IgniteUuid futId,
- int batchNum) throws IgniteCheckedException {
- assert key != null;
- assert mvccVer != null;
- assert tx != null;
-
- if (log.isDebugEnabled()) {
- log.debug("Added entry to mvcc cache: [key=" + key + ", val=" +
val + ", oldVal=" + oldVal +
- ", primary=" + primary + ", mvccVer=" + mvccVer + ", cacheId="
+ cacheId + ", ver=" + ver + ']');
- }
-
- // Do not cache updates if there are no DR or CQ were enabled when
cache was added as active for the current tx.
- if (!tx.txState().useMvccCaching(cacheId))
- return;
-
- AtomicInteger cntr = cntrs.computeIfAbsent(new
TxKey(mvccVer.coordinatorVersion(), mvccVer.counter()),
- v -> new AtomicInteger());
-
- if (cntr.incrementAndGet() > TX_SIZE_THRESHOLD)
- throw new IgniteCheckedException("Transaction is too large.
Consider reducing transaction size or " +
- "turning off continuous queries and datacenter replication
[size=" + cntr.get() + ", txXid=" + ver + ']');
-
- MvccTxEntry e = new MvccTxEntry(key, val, ttl, expireTime, ver,
oldVal, primary, topVer, mvccVer, cacheId);
-
- EnlistBuffer cached = enlistCache.computeIfAbsent(ver, v -> new
EnlistBuffer());
-
- cached.add(primary ? null : futId, primary ? -1 : batchNum, e);
- }
-
- /**
- * @param tx Transaction.
- * @param commit {@code True} if commit.
- */
- public void onTxFinished(IgniteInternalTx tx, boolean commit) throws
IgniteCheckedException {
- if (log.isDebugEnabled())
- log.debug("Transaction finished: [commit=" + commit + ", tx=" + tx
+ ']');
-
- if (tx.system() || tx.internal() || tx.mvccSnapshot() == null)
- return;
-
- cntrs.remove(new TxKey(tx.mvccSnapshot().coordinatorVersion(),
tx.mvccSnapshot().counter()));
-
- EnlistBuffer buf = enlistCache.remove(tx.xidVersion());
-
- Map<Integer, Map<KeyCacheObject, MvccTxEntry>> allCached = buf == null
? null : buf.getCached();
-
- TxCounters txCntrs = tx.txCounters(false);
-
- Collection<PartitionUpdateCountersMessage> cntrsColl = txCntrs == null
? null : txCntrs.updateCounters();
-
- if (txCntrs == null || F.isEmpty(cntrsColl))
- return;
-
- GridIntList cacheIds = tx.txState().cacheIds();
-
- assert cacheIds != null;
-
- for (int i = 0; i < cacheIds.size(); i++) {
- int cacheId = cacheIds.get(i);
-
- GridCacheContext ctx0 = cctx.cacheContext(cacheId);
-
- assert ctx0 != null;
-
- ctx0.group().listenerLock().readLock().lock();
-
- try {
- boolean hasListeners = ctx0.hasContinuousQueryListeners(tx);
- boolean drEnabled = ctx0.isDrEnabled();
-
- if (!hasListeners && !drEnabled)
- continue; // There are no listeners to notify.
-
- // Get cached entries for the given cache.
- Map<KeyCacheObject, MvccTxEntry> cached = allCached == null ?
null : allCached.get(cacheId);
-
- Map<Integer, Map<Integer, T2<AtomicLong, Long>>> cntrsMap =
countersPerPartition(cntrsColl);
-
- Map<Integer, T2<AtomicLong, Long>> cntrPerCache =
cntrsMap.get(cacheId);
-
- if (F.isEmpty(cntrPerCache))
- continue; // No updates were made for this cache.
-
- boolean fakeEntries = false;
-
- if (F.isEmpty(cached)) {
- if (log.isDebugEnabled())
- log.debug("Transaction updates were not cached fully
(this can happen when listener started" +
- " during the transaction execution). [tx=" + tx +
']');
-
- if (hasListeners) {
- cached = createFakeCachedEntries(cntrPerCache, tx,
cacheId); // Create fake update entries if we have CQ listeners.
-
- fakeEntries = true;
- }
- else
- continue; // Nothing to do further if tx is not cached
entirely and there are no any CQ listeners.
- }
-
- if (F.isEmpty(cached))
- continue;
-
- // Feed CQ & DR with entries.
- for (Map.Entry<KeyCacheObject, MvccTxEntry> entry :
cached.entrySet()) {
- MvccTxEntry e = entry.getValue();
-
- assert e.key().partition() != -1;
-
- assert cntrPerCache != null;
- assert e.cacheId() == cacheId;
-
- T2<AtomicLong, Long> cntr =
cntrPerCache.get(e.key().partition());
-
- long resCntr = cntr.getKey().incrementAndGet();
-
- assert resCntr <= cntr.getValue();
-
- e.updateCounter(resCntr);
-
- if (ctx0.group().sharedGroup()) {
- ctx0.group().onPartitionCounterUpdate(cacheId,
e.key().partition(), resCntr,
- tx.topologyVersion(), tx.local());
- }
-
- if (log.isDebugEnabled())
- log.debug("Process cached entry:" + e);
-
- // DR
- if (ctx0.isDrEnabled() && !fakeEntries) {
- ctx0.dr().replicate(e.key(), e.value(), e.ttl(),
e.expireTime(), e.version(),
- tx.local() ? DR_PRIMARY : DR_BACKUP,
e.topologyVersion());
- }
-
- // CQ
- CacheContinuousQueryManager contQryMgr =
ctx0.continuousQueries();
-
- if (ctx0.continuousQueries().notifyContinuousQueries(tx)) {
- Map<UUID, CacheContinuousQueryListener> lsnrCol =
continuousQueryListeners(ctx0, tx);
-
- if (!F.isEmpty(lsnrCol)) {
- contQryMgr.onEntryUpdated(
- lsnrCol,
- e.key(),
- commit ? e.value() : null, // Force skip
update counter if rolled back.
- commit ? e.oldValue() : null, // Force skip
update counter if rolled back.
- false,
- e.key().partition(),
- tx.local(),
- false,
- e.updateCounter(),
- null,
- e.topologyVersion());
- }
- }
- }
- }
- finally {
- ctx0.group().listenerLock().readLock().unlock();
- }
- }
- }
-
- /**
- * Calculates counters updates per cache and partition: cacheId -> partId
-> initCntr -> cntr + delta.
- *
- * @param cntrsColl Counters collection.
- * @return Counters updates per cache and partition.
- */
- private Map<Integer, Map<Integer, T2<AtomicLong, Long>>>
countersPerPartition(
- Collection<PartitionUpdateCountersMessage> cntrsColl) {
- //
- Map<Integer, Map<Integer, T2<AtomicLong, Long>>> cntrsMap = new
HashMap<>();
-
- for (PartitionUpdateCountersMessage msg : cntrsColl) {
- for (int i = 0; i < msg.size(); i++) {
- Map<Integer, T2<AtomicLong, Long>> cntrPerPart =
- cntrsMap.computeIfAbsent(msg.cacheId(), k -> new
HashMap<>());
-
- T2 prev = cntrPerPart.put(msg.partition(i),
- new T2<>(new AtomicLong(msg.initialCounter(i)),
msg.initialCounter(i) + msg.updatesCount(i)));
-
- assert prev == null;
- }
- }
-
- return cntrsMap;
- }
-
- /**
- * If transaction was not cached entirely (if listener was set during tx
execution), we should feed the CQ engine
- * with a fake entries prepared by this method.
- *
- * @param cntrPerCache Update counters deltas made by transaction.
- * @param tx Transaction.
- * @param cacheId Cache id.
- * @return Fake entries for each tx update.
- */
- private Map<KeyCacheObject, MvccTxEntry>
createFakeCachedEntries(Map<Integer, T2<AtomicLong, Long>> cntrPerCache,
- IgniteInternalTx tx, int cacheId) {
- Map<KeyCacheObject, MvccTxEntry> fakeCached = new HashMap<>();
-
- for (Map.Entry<Integer, T2<AtomicLong, Long>> e :
cntrPerCache.entrySet()) {
- int part = e.getKey();
-
- long startCntr = e.getValue().get1().get(); // Init update counter.
- long endCntr = e.getValue().get1().get() + e.getValue().get2(); //
Init update counter + delta.
-
- for (long i = startCntr; i < endCntr; i++) {
- KeyCacheObject fakeKey = new KeyCacheObjectImpl("", null,
part);
-
- MvccTxEntry fakeEntry = new MvccTxEntry(fakeKey, null, 0, 0,
tx.xidVersion(), null,
- tx.local(), tx.topologyVersion(), tx.mvccSnapshot(),
cacheId);
-
- fakeCached.put(fakeKey, fakeEntry);
- }
- }
-
- return fakeCached;
- }
-
- /**
- * @param ctx0 Cache context.
- * @param tx Transaction.
- * @return Map of listeners to be notified by this update.
- */
- public Map<UUID, CacheContinuousQueryListener>
continuousQueryListeners(GridCacheContext ctx0,
- @Nullable IgniteInternalTx tx) {
- return ctx0.continuousQueries().notifyContinuousQueries(tx) ?
- ctx0.continuousQueries().updateListeners(!ctx0.userCache(), false)
: null;
- }
-
- /**
- * Buffer for collecting enlisted entries. The main goal of this buffer is
to fix reordering of dht enlist requests
- * on backups.
- */
- private static class EnlistBuffer {
- /** Last DHT future id. */
- private IgniteUuid lastFutId;
-
- /** Main buffer for entries. CacheId -> entriesMap. */
- @GridToStringInclude
- private Map<Integer, Map<KeyCacheObject, MvccTxEntry>> cached = new
TreeMap<>();
-
- /** Pending entries. BatchId -> entriesMap. */
- @GridToStringInclude
- private SortedMap<Integer, Map<KeyCacheObject, MvccTxEntry>> pending;
-
- /**
- * Adds entry to caching buffer.
- *
- * @param futId Future id.
- * @param batchNum Batch number.
- * @param e Entry.
- */
- synchronized void add(IgniteUuid futId, int batchNum, MvccTxEntry e) {
- KeyCacheObject key = e.key();
-
- if (batchNum >= 0) {
- /*
- * Assume that batches within one future may be reordered. But
batches between futures cannot be
- * reordered. This means that if batches from the new DHT
future has arrived, all batches from the
- * previous one has already been collected.
- */
- if (lastFutId != null && !lastFutId.equals(futId)) { //
Request from new DHT future arrived.
- lastFutId = futId;
-
- // Flush pending for previous future.
- flushPending();
- }
-
- if (pending == null)
- pending = new TreeMap<>();
-
- MvccTxEntry prev = pending.computeIfAbsent(batchNum, k -> new
LinkedHashMap<>()).put(key, e);
-
- if (prev != null && prev.oldValue() != null)
- e.oldValue(prev.oldValue());
- }
- else { // batchNum == -1 means no reordering (e.g. this is a
primary node).
- assert batchNum == -1;
-
- Map<KeyCacheObject, MvccTxEntry> entriesForCache =
cached.computeIfAbsent(e.cacheId(), k -> new LinkedHashMap<>());
-
- MvccTxEntry prev = entriesForCache.put(key, e);
-
- // If key is updated more than once within transaction, we
should copy old value
- // (the value existed before tx started) from the previous
entry to the new one.
- if (prev != null && prev.oldValue() != null)
- e.oldValue(prev.oldValue());
- }
- }
-
- /**
- * @return Cached entries map.
- */
- synchronized Map<Integer, Map<KeyCacheObject, MvccTxEntry>>
getCached() {
- flushPending();
-
- return cached;
- }
-
- /**
- * Flush pending updates to cached map.
- */
- private void flushPending() {
- if (F.isEmpty(pending))
- return;
-
- for (Map.Entry<Integer, Map<KeyCacheObject, MvccTxEntry>> entry :
pending.entrySet()) {
- Map<KeyCacheObject, MvccTxEntry> vals = entry.getValue();
-
- for (Map.Entry<KeyCacheObject, MvccTxEntry> e :
vals.entrySet()) {
- Map<KeyCacheObject, MvccTxEntry> entriesForCache = cached
- .computeIfAbsent(e.getValue().cacheId(), k -> new
LinkedHashMap<>());
-
- MvccTxEntry prev = entriesForCache.put(e.getKey(),
e.getValue());
-
- // If key is updated more than once within transaction, we
should copy old value
- // (the value existed before tx started) from the previous
entry to the new one.
- if (prev != null && prev.oldValue() != null)
- e.getValue().oldValue(prev.oldValue());
- }
- }
-
- pending.clear();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(EnlistBuffer.class, this);
- }
- }
-}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccTxEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccTxEntry.java
deleted file mode 100644
index 28864f8dbca..00000000000
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccTxEntry.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import
org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Holder for the enlisted entries data.
- */
-public class MvccTxEntry {
- /** */
- private KeyCacheObject key;
-
- /** */
- private CacheObject val;
-
- /** */
- private int cacheId;
-
- /** */
- private GridCacheVersion ver;
-
- /** */
- private CacheObject oldVal;
-
- /** */
- private boolean primary;
-
- /** */
- private AffinityTopologyVersion topVer;
-
- /** */
- private MvccVersion mvccVer;
-
- /** */
- private long ttl;
-
- /** */
- private long expireTime;
-
- /** */
- private long updCntr;
-
- /**
- * @param key Key.
- * @param val New value.
- * @param ttl Time to live.
- * @param expireTime Expire time.
- * @param ver Tx grig cache version.
- * @param oldVal Old value.
- * @param primary {@code True} if this is a primary node.
- * @param topVer Topology version.
- * @param mvccVer Mvcc version.
- * @param cacheId Cache id.
- */
- public MvccTxEntry(KeyCacheObject key,
- @Nullable CacheObject val,
- long ttl,
- long expireTime,
- GridCacheVersion ver,
- CacheObject oldVal,
- boolean primary,
- AffinityTopologyVersion topVer,
- MvccVersion mvccVer,
- int cacheId) {
- assert key != null;
- assert mvccVer != null;
-
- this.key = key;
- this.val = val;
- this.ttl = ttl;
- this.expireTime = expireTime;
- this.ver = ver;
- this.oldVal = oldVal;
- this.primary = primary;
- this.topVer = topVer;
- this.mvccVer = mvccVer;
- this.cacheId = cacheId;
- }
-
- /**
- * @return Versioned entry (for DR).
- */
- public GridCacheRawVersionedEntry versionedEntry() {
- return new GridCacheRawVersionedEntry(key, val, ttl, expireTime, ver);
- }
-
- /**
- * @return Key.
- */
- public KeyCacheObject key() {
- return key;
- }
-
- /**
- * @return Value.
- */
- public CacheObject value() {
- return val;
- }
-
- /**
- * @return Time to live.
- */
- public long ttl() {
- return ttl;
- }
-
- /**
- * @return Expire time.
- */
- public long expireTime() {
- return expireTime;
- }
-
- /**
- * @return Version.
- */
- public GridCacheVersion version() {
- return ver;
- }
-
- /**
- * @return Old value.
- */
- public CacheObject oldValue() {
- return oldVal;
- }
-
- /**
- * @param oldVal Old value.
- */
- public void oldValue(CacheObject oldVal) {
- this.oldVal = oldVal;
- }
-
- /**
- * @return {@code True} if this entry is created on a primary node.
- */
- public boolean isPrimary() {
- return primary;
- }
-
- /**
- * @return Topology version.
- */
- public AffinityTopologyVersion topologyVersion() {
- return topVer;
- }
-
- /**
- * @return Mvcc version.
- */
- public MvccVersion mvccVersion() {
- return mvccVer;
- }
-
- /**
- * @return Cache id.
- */
- public int cacheId() {
- return cacheId;
- }
-
- /**
- * @return Update counter.
- */
- public long updateCounter() {
- return updCntr;
- }
-
- /**
- * @param updCntr Update counter.
- */
- public void updateCounter(long updCntr) {
- this.updCntr = updCntr;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(MvccTxEntry.class, this);
- }
-}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index f644c72e04a..f337f40a0e1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -1920,8 +1920,6 @@ public class IgniteTxHandler {
EntryProcessor entryProc = null;
Object[] invokeArgs = null;
- boolean needOldVal =
tx.txState().useMvccCaching(ctx.cacheId());
-
Message val0 = vals != null ? vals.get(i) : null;
CacheEntryInfoCollection entries =
@@ -1958,7 +1956,7 @@ public class IgniteTxHandler {
tx.topologyVersion(),
snapshot,
false,
- needOldVal,
+ false,
null,
false);
@@ -1980,7 +1978,7 @@ public class IgniteTxHandler {
op.cacheOperation(),
false,
false,
- needOldVal,
+ false,
null,
false,
false);
@@ -2014,10 +2012,6 @@ public class IgniteTxHandler {
}
}
- if (!updRes.filtered())
- ctx.shared().mvccCaching().addEnlisted(key,
updRes.newValue(), 0, 0, tx.xidVersion(),
- updRes.oldValue(), tx.local(),
tx.topologyVersion(), snapshot, ctx.cacheId(), tx, futId, batchNum);
-
assert updRes.updateFuture() == null : "Entry should
not be locked on the backup";
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
index 63f2c6bca14..c10f606b674 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
@@ -59,9 +59,6 @@ public class IgniteTxImplicitSingleStateImpl extends
IgniteTxLocalStateAdapter {
/** */
private boolean recovery;
- /** */
- private volatile boolean useMvccCaching;
-
/** {@inheritDoc} */
@Override public void addActiveCache(GridCacheContext ctx, boolean
recovery, IgniteTxAdapter tx) {
assert cacheCtx == null : "Cache already set [cur=" + cacheCtx.name()
+ ", new=" + ctx.name() + ']';
@@ -71,8 +68,6 @@ public class IgniteTxImplicitSingleStateImpl extends
IgniteTxLocalStateAdapter {
this.recovery = recovery;
tx.activeCachesDeploymentEnabled(cacheCtx.deploymentEnabled());
-
- useMvccCaching = false;
}
/** {@inheritDoc} */
@@ -316,13 +311,6 @@ public class IgniteTxImplicitSingleStateImpl extends
IgniteTxLocalStateAdapter {
return entry != null ? entry.get(0) : null;
}
- /** {@inheritDoc} */
- @Override public boolean useMvccCaching(int cacheId) {
- assert cacheCtx == null || cacheCtx.cacheId() == cacheId;
-
- return useMvccCaching;
- }
-
/** {@inheritDoc} */
@Override public boolean recovery() {
return recovery;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index d34490e4461..5f9ef732c31 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -868,8 +868,6 @@ public abstract class IgniteTxLocalAdapter extends
IgniteTxAdapter implements Ig
// Apply cache sizes only for primary nodes. Update counters
were applied on prepare state.
applyTxSizes();
- cctx.mvccCaching().onTxFinished(this, true);
-
if (ptr != null)
cctx.wal(true).flush(ptr, false);
}
@@ -993,8 +991,6 @@ public abstract class IgniteTxLocalAdapter extends
IgniteTxAdapter implements Ig
assert !needsCompletedVersions || committedVers != null :
"Missing committed versions for transaction: " + this;
assert !needsCompletedVersions || rolledbackVers != null :
"Missing rolledback versions for transaction: " + this;
}
-
- cctx.mvccCaching().onTxFinished(this, commit);
}
}
@@ -1050,8 +1046,6 @@ public abstract class IgniteTxLocalAdapter extends
IgniteTxAdapter implements Ig
if (DONE_FLAG_UPD.compareAndSet(this, 0, 1)) {
cctx.tm().rollbackTx(this, clearThreadMap,
skipCompletedVersions());
- cctx.mvccCaching().onTxFinished(this, false);
-
if (!internal()) {
Collection<CacheStoreManager> stores = txState.stores(cctx);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
index d29da192ca1..c1d973eeade 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
@@ -22,7 +22,6 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager;
import org.apache.ignite.internal.util.GridIntList;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.jetbrains.annotations.Nullable;
@@ -36,12 +35,6 @@ public abstract class IgniteTxRemoteStateAdapter implements
IgniteTxRemoteState
/** Active cache IDs. */
private GridIntList activeCacheIds = new GridIntList();
- /** Cache ids used for mvcc caching. See {@link MvccCachingManager}. */
- private GridIntList mvccCachingCacheIds = new GridIntList();
-
- /** */
- protected boolean mvccEnabled;
-
/** {@inheritDoc} */
@Override public boolean implicitSingle() {
return false;
@@ -87,12 +80,6 @@ public abstract class IgniteTxRemoteStateAdapter implements
IgniteTxRemoteState
int cacheId = cctx.cacheId();
- boolean mvccTx = tx.mvccSnapshot() != null;
-
- assert activeCacheIds.isEmpty() || mvccEnabled == mvccTx;
-
- mvccEnabled = mvccTx;
-
// Check if we can enlist new cache to transaction.
if (!activeCacheIds.contains(cacheId))
activeCacheIds.add(cacheId);
@@ -129,9 +116,4 @@ public abstract class IgniteTxRemoteStateAdapter implements
IgniteTxRemoteState
@Override public void onTxEnd(GridCacheSharedContext cctx,
IgniteInternalTx tx, boolean commit) {
assert false;
}
-
- /** {@inheritDoc} */
- @Override public boolean useMvccCaching(int cacheId) {
- return mvccCachingCacheIds.contains(cacheId);
- }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
index 0a32e96656f..73ec525d261 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
@@ -25,7 +25,6 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.util.GridIntList;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -189,11 +188,4 @@ public interface IgniteTxState {
* @return {@code True} if transaction is empty.
*/
public boolean empty();
-
- /**
- * @param cacheId Cache id.
- * @return {@code True} if it is need to store in the heap updates made by
the current TX for the given cache.
- * These updates will be used for CQ and DR. See {@link
MvccCachingManager}.
- */
- public boolean useMvccCaching(int cacheId);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index 310193a8cc7..e5dbfebe84a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -36,7 +36,6 @@ import
org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.util.GridIntList;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -75,13 +74,6 @@ public class IgniteTxStateImpl extends
IgniteTxLocalStateAdapter {
@GridToStringInclude
private Boolean recovery;
- /** */
- @GridToStringInclude
- private Boolean mvccEnabled;
-
- /** Cache ids used for mvcc caching. See {@link MvccCachingManager}. */
- private final GridIntList mvccCachingCacheIds = new GridIntList();
-
/** {@inheritDoc} */
@Override public boolean implicitSingle() {
return false;
@@ -222,8 +214,6 @@ public class IgniteTxStateImpl extends
IgniteTxLocalStateAdapter {
this.recovery = recovery;
- mvccEnabled = false;
-
// Check if we can enlist new cache to transaction.
if (!activeCacheIds.contains(cacheId)) {
String err = cctx.verifyTxCompatibility(tx, activeCacheIds,
cacheCtx);
@@ -482,11 +472,6 @@ public class IgniteTxStateImpl extends
IgniteTxLocalStateAdapter {
return writeView != null && writeView.size() == 1 ?
F.firstValue(writeView) : null;
}
- /** {@inheritDoc} */
- @Override public boolean useMvccCaching(int cacheId) {
- return mvccCachingCacheIds.contains(cacheId);
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IgniteTxStateImpl.class, this, "txMap",
allEntriesCopy());