This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3de5097fda IGNITE-23257 Switch to partition-operation thread to handle
client messages (#4559)
3de5097fda is described below
commit 3de5097fda6183a11b8ebf46671b0c9f22430d0b
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Wed Oct 16 10:26:12 2024 +0300
IGNITE-23257 Switch to partition-operation thread to handle client messages
(#4559)
---
.../apache/ignite/client/handler/TestServer.java | 3 +-
.../ignite/client/handler/ClientHandlerModule.java | 12 +-
.../handler/ClientInboundMessageHandler.java | 163 +++++++++++++++------
.../ignite/client/TestClientHandlerModule.java | 3 +-
.../java/org/apache/ignite/client/TestServer.java | 3 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 3 +-
6 files changed, 135 insertions(+), 52 deletions(-)
diff --git
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java
index 4f3a5d0dc7..bef42569f2 100644
---
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java
+++
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java
@@ -138,7 +138,8 @@ public class TestServer {
mock(CatalogService.class),
mock(PlacementDriver.class),
clientConnectorConfiguration,
- new TestLowWatermark()
+ new TestLowWatermark(),
+ Runnable::run
);
module.startAsync(componentContext).join();
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index 14a261db95..2d95e71092 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -36,6 +36,7 @@ import java.net.BindException;
import java.net.InetSocketAddress;
import java.nio.channels.UnresolvedAddressException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -125,6 +126,8 @@ public class ClientHandlerModule implements IgniteComponent
{
private final ClientConnectorConfiguration clientConnectorConfiguration;
+ private final Executor partitionOperationsExecutor;
+
@TestOnly
@SuppressWarnings("unused")
private volatile ChannelHandler handler;
@@ -144,6 +147,7 @@ public class ClientHandlerModule implements IgniteComponent
{
* @param clockService Clock service.
* @param clientConnectorConfiguration Configuration of the connector.
* @param lowWatermark Low watermark.
+ * @param partitionOperationsExecutor Executor for a partition operation.
*/
public ClientHandlerModule(
QueryProcessor queryProcessor,
@@ -161,7 +165,8 @@ public class ClientHandlerModule implements IgniteComponent
{
CatalogService catalogService,
PlacementDriver placementDriver,
ClientConnectorConfiguration clientConnectorConfiguration,
- LowWatermark lowWatermark
+ LowWatermark lowWatermark,
+ Executor partitionOperationsExecutor
) {
assert igniteTables != null;
assert queryProcessor != null;
@@ -178,6 +183,7 @@ public class ClientHandlerModule implements IgniteComponent
{
assert placementDriver != null;
assert clientConnectorConfiguration != null;
assert lowWatermark != null;
+ assert partitionOperationsExecutor != null;
this.queryProcessor = queryProcessor;
this.igniteTables = igniteTables;
@@ -195,6 +201,7 @@ public class ClientHandlerModule implements IgniteComponent
{
this.primaryReplicaTracker = new
ClientPrimaryReplicaTracker(placementDriver, catalogService, clockService,
schemaSyncService,
lowWatermark);
this.clientConnectorConfiguration = clientConnectorConfiguration;
+ this.partitionOperationsExecutor = partitionOperationsExecutor;
}
/** {@inheritDoc} */
@@ -384,7 +391,8 @@ public class ClientHandlerModule implements IgniteComponent
{
schemaSyncService,
catalogService,
connectionId,
- primaryReplicaTracker
+ primaryReplicaTracker,
+ partitionOperationsExecutor
);
}
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 2f17bd03c7..8290f9d423 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -36,6 +36,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
@@ -211,6 +212,8 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
private final long connectionId;
+ private final Executor partitionOperationsExecutor;
+
/**
* Constructor.
*
@@ -239,7 +242,8 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
SchemaSyncService schemaSyncService,
CatalogService catalogService,
long connectionId,
- ClientPrimaryReplicaTracker primaryReplicaTracker
+ ClientPrimaryReplicaTracker primaryReplicaTracker,
+ Executor partitionOperationsExecutor
) {
assert igniteTables != null;
assert igniteTransactions != null;
@@ -254,6 +258,7 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
assert schemaSyncService != null;
assert catalogService != null;
assert primaryReplicaTracker != null;
+ assert partitionOperationsExecutor != null;
this.igniteTables = igniteTables;
this.igniteTransactions = igniteTransactions;
@@ -266,6 +271,7 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
this.authenticationManager = authenticationManager;
this.clockService = clockService;
this.primaryReplicaTracker = primaryReplicaTracker;
+ this.partitionOperationsExecutor = partitionOperationsExecutor;
jdbcQueryCursorHandler = new JdbcQueryCursorHandlerImpl(resources);
jdbcQueryEventHandler = new JdbcQueryEventHandlerImpl(
@@ -310,13 +316,12 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
var unpacker = new ClientMessageUnpacker(byteBuf);
metrics.bytesReceivedAdd(byteBuf.readableBytes() +
ClientMessageCommon.HEADER_SIZE);
- // Packer buffer is released by Netty on send, or by inner exception
handlers below.
- var packer = getPacker(ctx.alloc());
-
switch (state) {
case STATE_BEFORE_HANDSHAKE:
state = STATE_HANDSHAKE_REQUESTED;
metrics.bytesReceivedAdd(ClientMessageCommon.MAGIC_BYTES.length);
+ // Packer buffer is released by Netty on send, or by inner
exception handlers below.
+ var packer = getPacker(ctx.alloc());
handshake(ctx, unpacker, packer);
break;
@@ -327,7 +332,7 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
case STATE_HANDSHAKE_RESPONSE_SENT:
assert clientContext != null : "Client context != null";
- processOperation(ctx, unpacker, packer);
+ processOperation(ctx, unpacker);
break;
@@ -582,9 +587,11 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
return new ClientMessagePacker(alloc.buffer());
}
- private void processOperation(ChannelHandlerContext ctx,
ClientMessageUnpacker in, ClientMessagePacker out) {
+ private void processOperation(ChannelHandlerContext ctx,
ClientMessageUnpacker in) {
long requestId = -1;
int opCode = -1;
+ ClientMessagePacker out = null;
+
metrics.requestsActiveIncrement();
try {
@@ -596,56 +603,37 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
+ ", remoteAddress=" + ctx.channel().remoteAddress() +
"]");
}
- out.packLong(requestId);
- writeFlags(out, ctx, false, false);
-
- // Observable timestamp should be calculated after the operation
is processed; reserve space, write later.
- int observableTimestampIdx = out.reserveLong();
-
- CompletableFuture fut = processOperation(in, out, opCode,
requestId);
-
- if (fut == null) {
- // Operation completed synchronously.
- in.close();
- out.setLong(observableTimestampIdx, observableTimestamp(out));
- write(out, ctx);
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("Client request processed synchronously [id=" +
requestId + ", op=" + opCode
- + ", remoteAddress=" +
ctx.channel().remoteAddress() + "]");
- }
+ if (isPartitionOperation(opCode)) {
+ long requestId0 = requestId;
+ int opCode0 = opCode;
- metrics.requestsProcessedIncrement();
- metrics.requestsActiveDecrement();
- } else {
- var reqId = requestId;
- var op = opCode;
+ partitionOperationsExecutor.execute(() -> {
+ // Packer buffer is released by Netty on send, or by inner
exception handlers below.
+ var outPacker = getPacker(ctx.alloc());
- fut.whenComplete((Object res, Object err) -> {
- in.close();
- metrics.requestsActiveDecrement();
+ try {
+ processOperationInternal(ctx, in, outPacker,
requestId0, opCode0);
+ } catch (Throwable t) {
+ in.close();
+ outPacker.close();
- if (err != null) {
- out.close();
- writeError(reqId, op, (Throwable) err, ctx, false);
+ writeError(requestId0, opCode0, t, ctx, false);
metrics.requestsFailedIncrement();
- } else {
- out.setLong(observableTimestampIdx,
observableTimestamp(out));
- write(out, ctx);
-
- metrics.requestsProcessedIncrement();
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("Client request processed [id=" + reqId
+ ", op=" + op
- + ", remoteAddress=" +
ctx.channel().remoteAddress() + "]");
- }
}
});
+ } else {
+ // Packer buffer is released by Netty on send, or by inner
exception handlers below.
+ out = getPacker(ctx.alloc());
+
+ processOperationInternal(ctx, in, out, requestId, opCode);
}
} catch (Throwable t) {
in.close();
- out.close();
+
+ if (out != null) {
+ out.close();
+ }
writeError(requestId, opCode, t, ctx, false);
@@ -831,6 +819,89 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
}
}
+ private boolean isPartitionOperation(int opCode) {
+ return opCode == ClientOp.TABLES_GET
+ || opCode == ClientOp.TUPLE_UPSERT
+ || opCode == ClientOp.TUPLE_GET
+ || opCode == ClientOp.TUPLE_UPSERT_ALL
+ || opCode == ClientOp.TUPLE_GET_ALL
+ || opCode == ClientOp.TUPLE_GET_AND_UPSERT
+ || opCode == ClientOp.TUPLE_INSERT
+ || opCode == ClientOp.TUPLE_INSERT_ALL
+ || opCode == ClientOp.TUPLE_REPLACE
+ || opCode == ClientOp.TUPLE_REPLACE_EXACT
+ || opCode == ClientOp.TUPLE_GET_AND_REPLACE
+ || opCode == ClientOp.TUPLE_DELETE
+ || opCode == ClientOp.TUPLE_DELETE_ALL
+ || opCode == ClientOp.TUPLE_DELETE_EXACT
+ || opCode == ClientOp.TUPLE_DELETE_ALL_EXACT
+ || opCode == ClientOp.TUPLE_GET_AND_DELETE
+ || opCode == ClientOp.TUPLE_CONTAINS_KEY
+ || opCode == ClientOp.TUPLE_CONTAINS_ALL_KEYS;
+ }
+
+ private void processOperationInternal(
+ ChannelHandlerContext ctx,
+ ClientMessageUnpacker in,
+ ClientMessagePacker out,
+ long requestId,
+ int opCode
+ ) {
+ out.packLong(requestId);
+ writeFlags(out, ctx, false, false);
+
+ // Observable timestamp should be calculated after the operation is
processed; reserve space, write later.
+ int observableTimestampIdx = out.reserveLong();
+
+ CompletableFuture fut;
+
+ try {
+ fut = processOperation(in, out, opCode, requestId);
+ } catch (IgniteInternalCheckedException e) {
+ fut = CompletableFuture.failedFuture(e);
+ }
+
+ if (fut == null) {
+ // Operation completed synchronously.
+ in.close();
+ out.setLong(observableTimestampIdx, observableTimestamp(out));
+ write(out, ctx);
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Client request processed synchronously [id=" +
requestId + ", op=" + opCode
+ + ", remoteAddress=" + ctx.channel().remoteAddress() +
"]");
+ }
+
+ metrics.requestsProcessedIncrement();
+ metrics.requestsActiveDecrement();
+ } else {
+ var reqId = requestId;
+ var op = opCode;
+
+ fut.whenComplete((Object res, Object err) -> {
+ in.close();
+ metrics.requestsActiveDecrement();
+
+ if (err != null) {
+ out.close();
+ writeError(reqId, op, (Throwable) err, ctx, false);
+
+ metrics.requestsFailedIncrement();
+ } else {
+ out.setLong(observableTimestampIdx,
observableTimestamp(out));
+ write(out, ctx);
+
+ metrics.requestsProcessedIncrement();
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Client request processed [id=" + reqId + ",
op=" + op
+ + ", remoteAddress=" +
ctx.channel().remoteAddress() + "]");
+ }
+ }
+ });
+ }
+ }
+
private void writeFlags(ClientMessagePacker out, ChannelHandlerContext
ctx, boolean isNotification, boolean isError) {
// Notify the client about primary replica change that happened for
ANY table since the last request.
// We can't assume that the client only uses uses a particular table
(e.g. the one present in the replica tracker), because
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
index c08e30e02a..041287b8af 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
@@ -235,7 +235,8 @@ public class TestClientHandlerModule implements
IgniteComponent {
clockService,
new
AlwaysSyncedSchemaSyncService(),
new TestLowWatermark()
- )
+ ),
+ Runnable::run
)
);
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
index cde4d43249..2904f2c40a 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
@@ -261,7 +261,8 @@ public class TestServer implements AutoCloseable {
new FakeCatalogService(FakeInternalTable.PARTITIONS),
ignite.placementDriver(),
clientConnectorConfiguration,
- new TestLowWatermark()
+ new TestLowWatermark(),
+ Runnable::run
);
module.startAsync(componentContext).join();
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 81833723ed..66889947fa 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1100,7 +1100,8 @@ public class IgniteImpl implements Ignite {
catalogManager,
placementDriverMgr.placementDriver(),
clientConnectorConfiguration,
- lowWatermark
+ lowWatermark,
+ threadPoolsManager.partitionOperationsExecutor()
);
restComponent = createRestComponent(name);