This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-17123 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 6025060cba1f19f95016d1320102c010e7f45c4e Author: Andrew Mashenkov <[email protected]> AuthorDate: Tue Jun 7 13:05:14 2022 +0300 Optimize partition counter calculation for WAL data records on backup. --- .../GridDistributedTxRemoteAdapter.java | 51 ++++++++++------------ 1 file changed, 22 insertions(+), 29 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 949c44748a7..2caa69a3e2f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -27,7 +27,6 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.stream.Collectors; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.failure.FailureContext; @@ -69,7 +68,6 @@ import org.apache.ignite.internal.util.lang.GridTuple; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.LT; @@ -515,8 +513,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter Collection<IgniteTxEntry> entries = near() || cctx.snapshot().needTxReadLogging() ? allEntries() : writeEntries(); - // Data entry to write to WAL and associated with it TxEntry. - List<T2<DataEntry, IgniteTxEntry>> dataEntries = null; + // Data entry to write to WAL. + List<DataEntry> dataEntries = null; batchStoreCommit(writeMap().values()); @@ -551,6 +549,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter while (true) { try { GridCacheEntryEx cached = txEntry.cached(); + DataEntry dataEntry = null; if (cached == null) txEntry.cached(cached = cacheCtx.cache().entryEx(txEntry.key(), topologyVersion())); @@ -621,23 +620,20 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter if (dataEntries == null) dataEntries = new ArrayList<>(entries.size()); - dataEntries.add( - new T2<>( - new DataEntry( - cacheCtx.cacheId(), - txEntry.key(), - val, - op, - nearXidVersion(), - addConflictVersion(writeVersion(), txEntry.conflictVersion()), - 0, - txEntry.key().partition(), - txEntry.updateCounter(), - DataEntry.flags(CU.txOnPrimary(this)) - ), - txEntry - ) + dataEntry = new DataEntry( + cacheCtx.cacheId(), + txEntry.key(), + val, + op, + nearXidVersion(), + addConflictVersion(writeVersion(), txEntry.conflictVersion()), + 0, + txEntry.key().partition(), + txEntry.updateCounter(), + DataEntry.flags(CU.txOnPrimary(this)) ); + + dataEntries.add(dataEntry); } if (op == CREATE || op == UPDATE) { @@ -683,7 +679,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter dhtVer, txEntry.updateCounter()); - txEntry.updateCounter(updRes.updateCounter()); + if (dataEntry != null) + dataEntry.partitionCounter(updRes.updateCounter()); if (updRes.loggedPointer() != null) ptr = updRes.loggedPointer(); @@ -719,7 +716,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter dhtVer, txEntry.updateCounter()); - txEntry.updateCounter(updRes.updateCounter()); + if (dataEntry != null) + dataEntry.partitionCounter(updRes.updateCounter()); if (updRes.loggedPointer() != null) ptr = updRes.loggedPointer(); @@ -801,14 +799,9 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter cctx.mvccCaching().onTxFinished(this, true); - if (!near() && !F.isEmpty(dataEntries) && cctx.wal(true) != null) { - // Set new update counters for data entries received from persisted tx entries. - List<DataEntry> entriesWithCounters = dataEntries.stream() - .map(tuple -> tuple.get1().partitionCounter(tuple.get2().updateCounter())) - .collect(Collectors.toList()); + if (!near() && !F.isEmpty(dataEntries) && cctx.wal(true) != null) + ptr = cctx.wal(true).log(new DataRecord(dataEntries)); - ptr = cctx.wal(true).log(new DataRecord(entriesWithCounters)); - } if (ptr != null) cctx.wal(true).flush(ptr, false);
