This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-17123
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 6025060cba1f19f95016d1320102c010e7f45c4e
Author: Andrew Mashenkov <[email protected]>
AuthorDate: Tue Jun 7 13:05:14 2022 +0300

    Optimize partition counter calculation for WAL data records on backup.
---
 .../GridDistributedTxRemoteAdapter.java            | 51 ++++++++++------------
 1 file changed, 22 insertions(+), 29 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 949c44748a7..2caa69a3e2f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -27,7 +27,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.failure.FailureContext;
@@ -69,7 +68,6 @@ import org.apache.ignite.internal.util.lang.GridTuple;
 import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
@@ -515,8 +513,8 @@ public abstract class GridDistributedTxRemoteAdapter 
extends IgniteTxAdapter
 
                         Collection<IgniteTxEntry> entries = near() || 
cctx.snapshot().needTxReadLogging() ? allEntries() : writeEntries();
 
-                        // Data entry to write to WAL and associated with it 
TxEntry.
-                        List<T2<DataEntry, IgniteTxEntry>> dataEntries = null;
+                        // Data entry to write to WAL.
+                        List<DataEntry> dataEntries = null;
 
                         batchStoreCommit(writeMap().values());
 
@@ -551,6 +549,7 @@ public abstract class GridDistributedTxRemoteAdapter 
extends IgniteTxAdapter
                             while (true) {
                                 try {
                                     GridCacheEntryEx cached = txEntry.cached();
+                                    DataEntry dataEntry = null;
 
                                     if (cached == null)
                                         txEntry.cached(cached = 
cacheCtx.cache().entryEx(txEntry.key(), topologyVersion()));
@@ -621,23 +620,20 @@ public abstract class GridDistributedTxRemoteAdapter 
extends IgniteTxAdapter
                                         if (dataEntries == null)
                                             dataEntries = new 
ArrayList<>(entries.size());
 
-                                        dataEntries.add(
-                                            new T2<>(
-                                                new DataEntry(
-                                                    cacheCtx.cacheId(),
-                                                    txEntry.key(),
-                                                    val,
-                                                    op,
-                                                    nearXidVersion(),
-                                                    
addConflictVersion(writeVersion(), txEntry.conflictVersion()),
-                                                    0,
-                                                    txEntry.key().partition(),
-                                                    txEntry.updateCounter(),
-                                                    
DataEntry.flags(CU.txOnPrimary(this))
-                                                ),
-                                                txEntry
-                                            )
+                                        dataEntry = new DataEntry(
+                                            cacheCtx.cacheId(),
+                                            txEntry.key(),
+                                            val,
+                                            op,
+                                            nearXidVersion(),
+                                            addConflictVersion(writeVersion(), 
txEntry.conflictVersion()),
+                                            0,
+                                            txEntry.key().partition(),
+                                            txEntry.updateCounter(),
+                                            
DataEntry.flags(CU.txOnPrimary(this))
                                         );
+
+                                        dataEntries.add(dataEntry);
                                     }
 
                                     if (op == CREATE || op == UPDATE) {
@@ -683,7 +679,8 @@ public abstract class GridDistributedTxRemoteAdapter 
extends IgniteTxAdapter
                                                 dhtVer,
                                                 txEntry.updateCounter());
 
-                                            
txEntry.updateCounter(updRes.updateCounter());
+                                            if (dataEntry != null)
+                                                
dataEntry.partitionCounter(updRes.updateCounter());
 
                                             if (updRes.loggedPointer() != null)
                                                 ptr = updRes.loggedPointer();
@@ -719,7 +716,8 @@ public abstract class GridDistributedTxRemoteAdapter 
extends IgniteTxAdapter
                                             dhtVer,
                                             txEntry.updateCounter());
 
-                                        
txEntry.updateCounter(updRes.updateCounter());
+                                        if (dataEntry != null)
+                                            
dataEntry.partitionCounter(updRes.updateCounter());
 
                                         if (updRes.loggedPointer() != null)
                                             ptr = updRes.loggedPointer();
@@ -801,14 +799,9 @@ public abstract class GridDistributedTxRemoteAdapter 
extends IgniteTxAdapter
 
                         cctx.mvccCaching().onTxFinished(this, true);
 
-                        if (!near() && !F.isEmpty(dataEntries) && 
cctx.wal(true) != null) {
-                            // Set new update counters for data entries 
received from persisted tx entries.
-                            List<DataEntry> entriesWithCounters = 
dataEntries.stream()
-                                .map(tuple -> 
tuple.get1().partitionCounter(tuple.get2().updateCounter()))
-                                .collect(Collectors.toList());
+                        if (!near() && !F.isEmpty(dataEntries) && 
cctx.wal(true) != null)
+                            ptr = cctx.wal(true).log(new 
DataRecord(dataEntries));
 
-                            ptr = cctx.wal(true).log(new 
DataRecord(entriesWithCounters));
-                        }
 
                         if (ptr != null)
                             cctx.wal(true).flush(ptr, false);

Reply via email to