This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8400cc6474b IGNITE-28349 Move all remaining handlers to the request 
handler registry in `PartitionReplicaListener` (#7867)
8400cc6474b is described below

commit 8400cc6474ba1f2a5c06342591a48ba292d0bdef
Author: Ivan Zlenko <[email protected]>
AuthorDate: Wed Mar 25 12:46:10 2026 +0500

    IGNITE-28349 Move all remaining handlers to the request handler registry in 
`PartitionReplicaListener` (#7867)
---
 .../replicator/PartitionReplicaListener.java       | 195 ++++++++++++---------
 1 file changed, 114 insertions(+), 81 deletions(-)

diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index c45b63708ab..b2e29dfff44 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -477,6 +477,42 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                 
(ReplicaRequestHandler<ReadOnlyScanRetrieveBatchReplicaRequest>) (req, primacy) 
->
                         processReadOnlyScanRetrieveBatchAction(req, 
primacy.isPrimary()));
 
+        handlersBuilder.addHandler(
+                PartitionReplicationMessageGroup.GROUP_TYPE,
+                PartitionReplicationMessageGroup.RW_SINGLE_ROW_REPLICA_REQUEST,
+                (ReplicaRequestHandler<ReadWriteSingleRowReplicaRequest>) 
(req, primacy) ->
+                        processRwSingleRowRequest(req, primacy));
+
+        handlersBuilder.addHandler(
+                PartitionReplicationMessageGroup.GROUP_TYPE,
+                
PartitionReplicationMessageGroup.RW_SINGLE_ROW_PK_REPLICA_REQUEST,
+                (ReplicaRequestHandler<ReadWriteSingleRowPkReplicaRequest>) 
(req, primacy) ->
+                        processRwSingleRowPkRequest(req, primacy));
+
+        handlersBuilder.addHandler(
+                PartitionReplicationMessageGroup.GROUP_TYPE,
+                PartitionReplicationMessageGroup.RW_MULTI_ROW_REPLICA_REQUEST,
+                (ReplicaRequestHandler<ReadWriteMultiRowReplicaRequest>) (req, 
primacy) ->
+                        processRwMultiRowRequest(req, primacy));
+
+        handlersBuilder.addHandler(
+                PartitionReplicationMessageGroup.GROUP_TYPE,
+                
PartitionReplicationMessageGroup.RW_MULTI_ROW_PK_REPLICA_REQUEST,
+                (ReplicaRequestHandler<ReadWriteMultiRowPkReplicaRequest>) 
(req, primacy) ->
+                        processRwMultiRowPkRequest(req, primacy));
+
+        handlersBuilder.addHandler(
+                PartitionReplicationMessageGroup.GROUP_TYPE,
+                PartitionReplicationMessageGroup.RW_DUAL_ROW_REPLICA_REQUEST,
+                (ReplicaRequestHandler<ReadWriteSwapRowReplicaRequest>) (req, 
primacy) ->
+                        processRwSwapRowRequest(req, primacy));
+
+        handlersBuilder.addHandler(
+                PartitionReplicationMessageGroup.GROUP_TYPE,
+                
PartitionReplicationMessageGroup.RW_SCAN_RETRIEVE_BATCH_REPLICA_REQUEST,
+                
(ReplicaRequestHandler<ReadWriteScanRetrieveBatchReplicaRequest>) (req, 
primacy) ->
+                        processRwScanRetrieveBatchRequest(req));
+
         requestHandlers = handlersBuilder.build();
     }
 
@@ -575,89 +611,86 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
             return roHandler.handle(request, opStartTsIfDirectRo);
         }
 
-        if (request instanceof ReadWriteSingleRowReplicaRequest) {
-            var req = (ReadWriteSingleRowReplicaRequest) request;
-
-            return appendTxCommand(
-                    req.transactionId(),
-                    req.requestType(),
-                    req.full(),
-                    () -> processSingleEntryAction(req, 
replicaPrimacy.leaseStartTime()).whenComplete(
-                            (r, e) -> setDelayedAckProcessor(r, 
req.delayedAckProcessor()))
-            );
-        } else if (request instanceof ReadWriteSingleRowPkReplicaRequest) {
-            var req = (ReadWriteSingleRowPkReplicaRequest) request;
-
-            return appendTxCommand(
-                    req.transactionId(),
-                    req.requestType(),
-                    req.full(),
-                    () -> processSingleEntryAction(req, 
replicaPrimacy.leaseStartTime()).whenComplete(
-                            (r, e) -> setDelayedAckProcessor(r, 
req.delayedAckProcessor()))
-            );
-        } else if (request instanceof ReadWriteMultiRowReplicaRequest) {
-            var req = (ReadWriteMultiRowReplicaRequest) request;
-
-            return appendTxCommand(
-                    req.transactionId(),
-                    req.requestType(),
-                    req.full(),
-                    () -> processMultiEntryAction(req, 
replicaPrimacy.leaseStartTime()).whenComplete(
-                            (r, e) -> setDelayedAckProcessor(r, 
req.delayedAckProcessor()))
-            );
-        } else if (request instanceof ReadWriteMultiRowPkReplicaRequest) {
-            var req = (ReadWriteMultiRowPkReplicaRequest) request;
-
-            return appendTxCommand(
-                    req.transactionId(),
-                    req.requestType(),
-                    req.full(),
-                    () -> processMultiEntryAction(req, 
replicaPrimacy.leaseStartTime()).whenComplete(
-                            (r, e) -> setDelayedAckProcessor(r, 
req.delayedAckProcessor()))
-            );
-        } else if (request instanceof ReadWriteSwapRowReplicaRequest) {
-            var req = (ReadWriteSwapRowReplicaRequest) request;
-
-            return appendTxCommand(
-                    req.transactionId(),
-                    req.requestType(),
-                    req.full(),
-                    () -> processTwoEntriesAction(req, 
replicaPrimacy.leaseStartTime()).whenComplete(
-                            (r, e) -> setDelayedAckProcessor(r, 
req.delayedAckProcessor()))
-            );
-        } else if (request instanceof 
ReadWriteScanRetrieveBatchReplicaRequest) {
-            var req = (ReadWriteScanRetrieveBatchReplicaRequest) request;
-
-            // Scan's request.full() has a slightly different semantics than 
the same field in other requests -
-            // it identifies an implicit transaction. Please note that 
request.full() is always false in the following `appendTxCommand`.
-            // We treat SCAN as 2pc and only switch to a 1pc mode if all table 
rows fit in the bucket and the transaction is implicit.
-            // See `req.full() && (err != null || rows.size() < 
req.batchSize())` condition.
-            // If they don't fit the bucket, the transaction is treated as 2pc.
-            replicaTouch(req.transactionId(), req.coordinatorId(), 
req.commitPartitionId().asZonePartitionId(), req.txLabel());
-
-            // Implicit RW scan can be committed locally on a last batch or 
error.
-            return appendTxCommand(req.transactionId(), RW_SCAN, false, () -> 
processScanRetrieveBatchAction(req))
-                    .thenCompose(rows -> {
-                        if (allElementsAreNull(rows)) {
-                            return completedFuture(rows);
-                        } else {
-                            return 
validateRwReadAgainstSchemaAfterTakingLocks(req.transactionId())
-                                    .thenApply(ignored -> {
-                                        metrics.onRead(rows.size(), false, 
true);
+        throw new UnsupportedReplicaRequestException(request.getClass());
+    }
 
-                                        return rows;
-                                    });
-                        }
-                    })
-                    .whenComplete((rows, err) -> {
-                        if (req.full() && (err != null || rows.size() < 
req.batchSize())) {
-                            releaseTxLocks(req.transactionId());
-                        }
-                    });
-        }
+    private CompletableFuture<?> 
processRwSingleRowRequest(ReadWriteSingleRowReplicaRequest req, ReplicaPrimacy 
primacy) {
+        return appendTxCommand(
+                req.transactionId(),
+                req.requestType(),
+                req.full(),
+                () -> processSingleEntryAction(req, 
primacy.leaseStartTime()).whenComplete(
+                        (r, e) -> setDelayedAckProcessor(r, 
req.delayedAckProcessor()))
+        );
+    }
 
-        // Unknown request.
-        throw new UnsupportedReplicaRequestException(request.getClass());
+    private CompletableFuture<?> 
processRwSingleRowPkRequest(ReadWriteSingleRowPkReplicaRequest req, 
ReplicaPrimacy primacy) {
+        return appendTxCommand(
+                req.transactionId(),
+                req.requestType(),
+                req.full(),
+                () -> processSingleEntryAction(req, 
primacy.leaseStartTime()).whenComplete(
+                        (r, e) -> setDelayedAckProcessor(r, 
req.delayedAckProcessor()))
+        );
+    }
+
+    private CompletableFuture<?> 
processRwMultiRowRequest(ReadWriteMultiRowReplicaRequest req, ReplicaPrimacy 
primacy) {
+        return appendTxCommand(
+                req.transactionId(),
+                req.requestType(),
+                req.full(),
+                () -> processMultiEntryAction(req, 
primacy.leaseStartTime()).whenComplete(
+                        (r, e) -> setDelayedAckProcessor(r, 
req.delayedAckProcessor()))
+        );
+    }
+
+    private CompletableFuture<?> 
processRwMultiRowPkRequest(ReadWriteMultiRowPkReplicaRequest req, 
ReplicaPrimacy primacy) {
+        return appendTxCommand(
+                req.transactionId(),
+                req.requestType(),
+                req.full(),
+                () -> processMultiEntryAction(req, 
primacy.leaseStartTime()).whenComplete(
+                        (r, e) -> setDelayedAckProcessor(r, 
req.delayedAckProcessor()))
+        );
+    }
+
+    private CompletableFuture<?> 
processRwSwapRowRequest(ReadWriteSwapRowReplicaRequest req, ReplicaPrimacy 
primacy) {
+        return appendTxCommand(
+                req.transactionId(),
+                req.requestType(),
+                req.full(),
+                () -> processTwoEntriesAction(req, 
primacy.leaseStartTime()).whenComplete(
+                        (r, e) -> setDelayedAckProcessor(r, 
req.delayedAckProcessor()))
+        );
+    }
+
+    private CompletableFuture<?> 
processRwScanRetrieveBatchRequest(ReadWriteScanRetrieveBatchReplicaRequest req) 
{
+        // Scan's request.full() has a slightly different semantics than the 
same field in other requests -
+        // it identifies an implicit transaction. Please note that 
request.full() is always false in the following `appendTxCommand`.
+        // We treat SCAN as 2pc and only switch to a 1pc mode if all table 
rows fit in the bucket and the transaction is implicit.
+        // See `req.full() && (err != null || rows.size() < req.batchSize())` 
condition.
+        // If they don't fit the bucket, the transaction is treated as 2pc.
+        replicaTouch(req.transactionId(), req.coordinatorId(), 
req.commitPartitionId().asZonePartitionId(), req.txLabel());
+
+        // Implicit RW scan can be committed locally on a last batch or error.
+        return appendTxCommand(req.transactionId(), RW_SCAN, false, () -> 
processScanRetrieveBatchAction(req))
+                .thenCompose(rows -> {
+                    if (allElementsAreNull(rows)) {
+                        return completedFuture(rows);
+                    } else {
+                        return 
validateRwReadAgainstSchemaAfterTakingLocks(req.transactionId())
+                                .thenApply(ignored -> {
+                                    metrics.onRead(rows.size(), false, true);
+
+                                    return rows;
+                                });
+                    }
+                })
+                .whenComplete((rows, err) -> {
+                    if (req.full() && (err != null || rows.size() < 
req.batchSize())) {
+                        releaseTxLocks(req.transactionId());
+                    }
+                });
     }
 
     /**

Reply via email to