This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 3de5097fda IGNITE-23257 Switch to partition-operation thread to handle 
client messages (#4559)
3de5097fda is described below

commit 3de5097fda6183a11b8ebf46671b0c9f22430d0b
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Wed Oct 16 10:26:12 2024 +0300

    IGNITE-23257 Switch to partition-operation thread to handle client messages 
(#4559)
---
 .../apache/ignite/client/handler/TestServer.java   |   3 +-
 .../ignite/client/handler/ClientHandlerModule.java |  12 +-
 .../handler/ClientInboundMessageHandler.java       | 163 +++++++++++++++------
 .../ignite/client/TestClientHandlerModule.java     |   3 +-
 .../java/org/apache/ignite/client/TestServer.java  |   3 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   3 +-
 6 files changed, 135 insertions(+), 52 deletions(-)

diff --git 
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java
 
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java
index 4f3a5d0dc7..bef42569f2 100644
--- 
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java
+++ 
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java
@@ -138,7 +138,8 @@ public class TestServer {
                 mock(CatalogService.class),
                 mock(PlacementDriver.class),
                 clientConnectorConfiguration,
-                new TestLowWatermark()
+                new TestLowWatermark(),
+                Runnable::run
         );
 
         module.startAsync(componentContext).join();
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index 14a261db95..2d95e71092 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -36,6 +36,7 @@ import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.nio.channels.UnresolvedAddressException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -125,6 +126,8 @@ public class ClientHandlerModule implements IgniteComponent 
{
 
     private final ClientConnectorConfiguration clientConnectorConfiguration;
 
+    private final Executor partitionOperationsExecutor;
+
     @TestOnly
     @SuppressWarnings("unused")
     private volatile ChannelHandler handler;
@@ -144,6 +147,7 @@ public class ClientHandlerModule implements IgniteComponent 
{
      * @param clockService Clock service.
      * @param clientConnectorConfiguration Configuration of the connector.
      * @param lowWatermark Low watermark.
+     * @param partitionOperationsExecutor Executor for a partition operation.
      */
     public ClientHandlerModule(
             QueryProcessor queryProcessor,
@@ -161,7 +165,8 @@ public class ClientHandlerModule implements IgniteComponent 
{
             CatalogService catalogService,
             PlacementDriver placementDriver,
             ClientConnectorConfiguration clientConnectorConfiguration,
-            LowWatermark lowWatermark
+            LowWatermark lowWatermark,
+            Executor partitionOperationsExecutor
     ) {
         assert igniteTables != null;
         assert queryProcessor != null;
@@ -178,6 +183,7 @@ public class ClientHandlerModule implements IgniteComponent 
{
         assert placementDriver != null;
         assert clientConnectorConfiguration != null;
         assert lowWatermark != null;
+        assert partitionOperationsExecutor != null;
 
         this.queryProcessor = queryProcessor;
         this.igniteTables = igniteTables;
@@ -195,6 +201,7 @@ public class ClientHandlerModule implements IgniteComponent 
{
         this.primaryReplicaTracker = new 
ClientPrimaryReplicaTracker(placementDriver, catalogService, clockService, 
schemaSyncService,
                 lowWatermark);
         this.clientConnectorConfiguration = clientConnectorConfiguration;
+        this.partitionOperationsExecutor = partitionOperationsExecutor;
     }
 
     /** {@inheritDoc} */
@@ -384,7 +391,8 @@ public class ClientHandlerModule implements IgniteComponent 
{
                 schemaSyncService,
                 catalogService,
                 connectionId,
-                primaryReplicaTracker
+                primaryReplicaTracker,
+                partitionOperationsExecutor
         );
     }
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 2f17bd03c7..8290f9d423 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -36,6 +36,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
@@ -211,6 +212,8 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
 
     private final long connectionId;
 
+    private final Executor partitionOperationsExecutor;
+
     /**
      * Constructor.
      *
@@ -239,7 +242,8 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
             SchemaSyncService schemaSyncService,
             CatalogService catalogService,
             long connectionId,
-            ClientPrimaryReplicaTracker primaryReplicaTracker
+            ClientPrimaryReplicaTracker primaryReplicaTracker,
+            Executor partitionOperationsExecutor
     ) {
         assert igniteTables != null;
         assert igniteTransactions != null;
@@ -254,6 +258,7 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
         assert schemaSyncService != null;
         assert catalogService != null;
         assert primaryReplicaTracker != null;
+        assert partitionOperationsExecutor != null;
 
         this.igniteTables = igniteTables;
         this.igniteTransactions = igniteTransactions;
@@ -266,6 +271,7 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
         this.authenticationManager = authenticationManager;
         this.clockService = clockService;
         this.primaryReplicaTracker = primaryReplicaTracker;
+        this.partitionOperationsExecutor = partitionOperationsExecutor;
 
         jdbcQueryCursorHandler = new JdbcQueryCursorHandlerImpl(resources);
         jdbcQueryEventHandler = new JdbcQueryEventHandlerImpl(
@@ -310,13 +316,12 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
         var unpacker = new ClientMessageUnpacker(byteBuf);
         metrics.bytesReceivedAdd(byteBuf.readableBytes() + 
ClientMessageCommon.HEADER_SIZE);
 
-        // Packer buffer is released by Netty on send, or by inner exception 
handlers below.
-        var packer = getPacker(ctx.alloc());
-
         switch (state) {
             case STATE_BEFORE_HANDSHAKE:
                 state = STATE_HANDSHAKE_REQUESTED;
                 
metrics.bytesReceivedAdd(ClientMessageCommon.MAGIC_BYTES.length);
+                // Packer buffer is released by Netty on send, or by inner 
exception handlers below.
+                var packer = getPacker(ctx.alloc());
                 handshake(ctx, unpacker, packer);
 
                 break;
@@ -327,7 +332,7 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
 
             case STATE_HANDSHAKE_RESPONSE_SENT:
                 assert clientContext != null : "Client context != null";
-                processOperation(ctx, unpacker, packer);
+                processOperation(ctx, unpacker);
 
                 break;
 
@@ -582,9 +587,11 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
         return new ClientMessagePacker(alloc.buffer());
     }
 
-    private void processOperation(ChannelHandlerContext ctx, 
ClientMessageUnpacker in, ClientMessagePacker out) {
+    private void processOperation(ChannelHandlerContext ctx, 
ClientMessageUnpacker in) {
         long requestId = -1;
         int opCode = -1;
+        ClientMessagePacker out = null;
+
         metrics.requestsActiveIncrement();
 
         try {
@@ -596,56 +603,37 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
                         + ", remoteAddress=" + ctx.channel().remoteAddress() + 
"]");
             }
 
-            out.packLong(requestId);
-            writeFlags(out, ctx, false, false);
-
-            // Observable timestamp should be calculated after the operation 
is processed; reserve space, write later.
-            int observableTimestampIdx = out.reserveLong();
-
-            CompletableFuture fut = processOperation(in, out, opCode, 
requestId);
-
-            if (fut == null) {
-                // Operation completed synchronously.
-                in.close();
-                out.setLong(observableTimestampIdx, observableTimestamp(out));
-                write(out, ctx);
-
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Client request processed synchronously [id=" + 
requestId + ", op=" + opCode
-                            + ", remoteAddress=" + 
ctx.channel().remoteAddress() + "]");
-                }
+            if (isPartitionOperation(opCode)) {
+                long requestId0 = requestId;
+                int opCode0 = opCode;
 
-                metrics.requestsProcessedIncrement();
-                metrics.requestsActiveDecrement();
-            } else {
-                var reqId = requestId;
-                var op = opCode;
+                partitionOperationsExecutor.execute(() -> {
+                    // Packer buffer is released by Netty on send, or by inner 
exception handlers below.
+                    var outPacker = getPacker(ctx.alloc());
 
-                fut.whenComplete((Object res, Object err) -> {
-                    in.close();
-                    metrics.requestsActiveDecrement();
+                    try {
+                        processOperationInternal(ctx, in, outPacker, 
requestId0, opCode0);
+                    } catch (Throwable t) {
+                        in.close();
+                        outPacker.close();
 
-                    if (err != null) {
-                        out.close();
-                        writeError(reqId, op, (Throwable) err, ctx, false);
+                        writeError(requestId0, opCode0, t, ctx, false);
 
                         metrics.requestsFailedIncrement();
-                    } else {
-                        out.setLong(observableTimestampIdx, 
observableTimestamp(out));
-                        write(out, ctx);
-
-                        metrics.requestsProcessedIncrement();
-
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("Client request processed [id=" + reqId 
+ ", op=" + op
-                                    + ", remoteAddress=" + 
ctx.channel().remoteAddress() + "]");
-                        }
                     }
                 });
+            } else {
+                // Packer buffer is released by Netty on send, or by inner 
exception handlers below.
+                out = getPacker(ctx.alloc());
+
+                processOperationInternal(ctx, in, out, requestId, opCode);
             }
         } catch (Throwable t) {
             in.close();
-            out.close();
+
+            if (out != null) {
+                out.close();
+            }
 
             writeError(requestId, opCode, t, ctx, false);
 
@@ -831,6 +819,89 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
         }
     }
 
+    private boolean isPartitionOperation(int opCode) {
+        return opCode == ClientOp.TABLES_GET
+                || opCode == ClientOp.TUPLE_UPSERT
+                || opCode == ClientOp.TUPLE_GET
+                || opCode == ClientOp.TUPLE_UPSERT_ALL
+                || opCode == ClientOp.TUPLE_GET_ALL
+                || opCode == ClientOp.TUPLE_GET_AND_UPSERT
+                || opCode == ClientOp.TUPLE_INSERT
+                || opCode == ClientOp.TUPLE_INSERT_ALL
+                || opCode == ClientOp.TUPLE_REPLACE
+                || opCode == ClientOp.TUPLE_REPLACE_EXACT
+                || opCode == ClientOp.TUPLE_GET_AND_REPLACE
+                || opCode == ClientOp.TUPLE_DELETE
+                || opCode == ClientOp.TUPLE_DELETE_ALL
+                || opCode == ClientOp.TUPLE_DELETE_EXACT
+                || opCode == ClientOp.TUPLE_DELETE_ALL_EXACT
+                || opCode == ClientOp.TUPLE_GET_AND_DELETE
+                || opCode == ClientOp.TUPLE_CONTAINS_KEY
+                || opCode == ClientOp.TUPLE_CONTAINS_ALL_KEYS;
+    }
+
+    private void processOperationInternal(
+            ChannelHandlerContext ctx,
+            ClientMessageUnpacker in,
+            ClientMessagePacker out,
+            long requestId,
+            int opCode
+    ) {
+        out.packLong(requestId);
+        writeFlags(out, ctx, false, false);
+
+        // Observable timestamp should be calculated after the operation is 
processed; reserve space, write later.
+        int observableTimestampIdx = out.reserveLong();
+
+        CompletableFuture fut;
+
+        try {
+            fut = processOperation(in, out, opCode, requestId);
+        } catch (IgniteInternalCheckedException e) {
+            fut = CompletableFuture.failedFuture(e);
+        }
+
+        if (fut == null) {
+            // Operation completed synchronously.
+            in.close();
+            out.setLong(observableTimestampIdx, observableTimestamp(out));
+            write(out, ctx);
+
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Client request processed synchronously [id=" + 
requestId + ", op=" + opCode
+                        + ", remoteAddress=" + ctx.channel().remoteAddress() + 
"]");
+            }
+
+            metrics.requestsProcessedIncrement();
+            metrics.requestsActiveDecrement();
+        } else {
+            var reqId = requestId;
+            var op = opCode;
+
+            fut.whenComplete((Object res, Object err) -> {
+                in.close();
+                metrics.requestsActiveDecrement();
+
+                if (err != null) {
+                    out.close();
+                    writeError(reqId, op, (Throwable) err, ctx, false);
+
+                    metrics.requestsFailedIncrement();
+                } else {
+                    out.setLong(observableTimestampIdx, 
observableTimestamp(out));
+                    write(out, ctx);
+
+                    metrics.requestsProcessedIncrement();
+
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Client request processed [id=" + reqId + ", 
op=" + op
+                                + ", remoteAddress=" + 
ctx.channel().remoteAddress() + "]");
+                    }
+                }
+            });
+        }
+    }
+
     private void writeFlags(ClientMessagePacker out, ChannelHandlerContext 
ctx, boolean isNotification, boolean isError) {
         // Notify the client about primary replica change that happened for 
ANY table since the last request.
         // We can't assume that the client only uses uses a particular table 
(e.g. the one present in the replica tracker), because
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
 
b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
index c08e30e02a..041287b8af 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
@@ -235,7 +235,8 @@ public class TestClientHandlerModule implements 
IgniteComponent {
                                                 clockService,
                                                 new 
AlwaysSyncedSchemaSyncService(),
                                                 new TestLowWatermark()
-                                        )
+                                        ),
+                                        Runnable::run
                                 )
                         );
                     }
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java 
b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
index cde4d43249..2904f2c40a 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
@@ -261,7 +261,8 @@ public class TestServer implements AutoCloseable {
                         new FakeCatalogService(FakeInternalTable.PARTITIONS),
                         ignite.placementDriver(),
                         clientConnectorConfiguration,
-                        new TestLowWatermark()
+                        new TestLowWatermark(),
+                        Runnable::run
                 );
 
         module.startAsync(componentContext).join();
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 81833723ed..66889947fa 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1100,7 +1100,8 @@ public class IgniteImpl implements Ignite {
                 catalogManager,
                 placementDriverMgr.placementDriver(),
                 clientConnectorConfiguration,
-                lowWatermark
+                lowWatermark,
+                threadPoolsManager.partitionOperationsExecutor()
         );
 
         restComponent = createRestComponent(name);

Reply via email to