[
https://issues.apache.org/jira/browse/IGNITE-24224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mikhail Petrov updated IGNITE-24224:
------------------------------------
Ignite Flags: Release Notes Required (was: Docs Required,Release Notes
Required)
> Broken serialization of communication messages in cluster with SSL enabled.
> ---------------------------------------------------------------------------
>
> Key: IGNITE-24224
> URL: https://issues.apache.org/jira/browse/IGNITE-24224
> Project: Ignite
> Issue Type: Bug
> Reporter: Mikhail Petrov
> Priority: Blocker
>
> The following code can be considered as a reproducer of the problem.
> Its main goal is to cause Ignite to be overloaded with communication
> messages. In this particular case it will be GridJobExecuteRequest and
> GridJobExecuteResponce spamming.
> {code:java}
> /** */
> public class BrokenSerializationTest extends AbstractSecurityTest {
> /** {@inheritDoc} */
> @Override protected IgniteConfiguration getConfiguration(String
> igniteInstanceName) throws Exception {
> SslContextFactory factory =
> (SslContextFactory)sslTrustedFactory("server", "trustone");
> IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
> .setSslContextFactory(factory)
> .setFailureHandler(new StopNodeOrHaltFailureHandler());
> cfg.setCommunicationSpi(new
> TcpCommunicationSpi().setUnacknowledgedMessagesBufferSize(160)); // Small
> values of this property causes Ignite to frequently reestablish connection
> between nodes.
> return cfg;
> }
> /** Tests task execution security context in case task was initiated from
> the {@link IgniteClient}. */
> @Test
> public void test() throws Exception {
> IgniteEx ignite = startGridAllowAll("crd");
> startGridAllowAll("srv1");
> startGridAllowAll("srv2");
> startGridAllowAll("srv3");
> startGridAllowAll("srv4");
> startGridAllowAll("srv5");
> awaitPartitionMapExchange();
> Set<Throwable> errors = ConcurrentHashMap.newKeySet();
> Set<ComputeTaskFuture<Integer>> futs = ConcurrentHashMap.newKeySet();
> CountDownLatch latch = new CountDownLatch(16 * 10000);
> for (int i = 0; i < 16; i++) {
> GridTestUtils.runAsync(() -> {
> try {
> U.sleep(1000);
> for (int j = 0; j < 10000; j++) {
> futs.add(ignite.context().task().<Void,
> Integer>execute(
> TestTask.class.getName(),
> null,
>
> TaskExecutionOptions.options().asPublicRequest().withProjection(ignite.cluster().nodes())
> ).publicFuture());
> latch.countDown();
> }
> }
> catch (Throwable e) {
> errors.add(e);
> }
> });
> }
> latch.await(getTestTimeout(), TimeUnit.MILLISECONDS);
> assertTrue(errors.isEmpty());
> for (ComputeTaskFuture<Integer> fut : futs) {
> assertTrue(null == fut.get(getTestTimeout()));
> }
> }
> /** */
> public static class TestTask implements ComputeTask<Void, Integer> {
> /** {@inheritDoc} */
> @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
> List<ClusterNode> subgrid,
> @Nullable Void arg
> ) throws IgniteException {
> Map<ComputeJob, ClusterNode> res = new HashMap<>();
> for (ClusterNode node : subgrid) {
> res.put(new ComputeJob() {
> @Override public void cancel() {
> // No-op.
> }
> @Override public Object execute() throws IgniteException {
> return null;
> }
> }, node);
> }
> return res;
> }
> /** {@inheritDoc} */
> @Override public ComputeJobResultPolicy result(ComputeJobResult res,
> List<ComputeJobResult> rcvd) {
> if (res.getException() != null)
> throw new IgniteException(res.getException());
> return ComputeJobResultPolicy.WAIT;
> }
> /** {@inheritDoc} */
> @Override public @Nullable Integer reduce(List<ComputeJobResult>
> results) throws IgniteException {
> assertEquals(allGrids().size(), results.size());
> return null;
> }
> }
> }
> {code}
> It does not reproduce the problem sequentially because the problem is related
> to cases when only part of the message is read into the application buffer
> while reading from the socket.
> While this reproducer is in progress you can witness different errors related
> to communication message handling e.g.
> {code:java}
> [2025-01-16T10:10:11,136][ERROR][grid-nio-worker-tcp-comm-3-#43%TcpCommunicationSpi%][TcpCommunicationSpi]
> Failed to process selector key [ses=GridSelectorNioSessionImpl
> [worker=DirectNioClientWorker [super=AbstractNioClientWorker [idx=3,
> bytesRcvd=10261, bytesSent=0, bytesRcvd0=0, bytesSent0=0, select=true,
> super=GridWorker [name=grid-nio-worker-tcp-comm-3,
> igniteInstanceName=TcpCommunicationSpi, finished=false,
> heartbeatTs=1737011410109, hashCode=520079162, interrupted=false,
> runner=grid-nio-worker-tcp-comm-3-#43%TcpCommunicationSpi%]]],
> writeBuf=java.nio.DirectByteBuffer[pos=0 lim=32768 cap=32768],
> readBuf=java.nio.DirectByteBuffer[pos=2398 lim=2398 cap=32768],
> inRecovery=GridNioRecoveryDescriptor [acked=196, resendCnt=13630, rcvCnt=168,
> sentCnt=13826, reserved=true, lastAck=160, nodeLeft=false,
> node=TcpDiscoveryNode [id=4062535d-f6a8-4ba3-ab95-ace1508ad004,
> consistentId=127.0.0.1:47504, addrs=ArrayList [127.0.0.1], sockAddrs=HashSet
> [/127.0.0.1:47504], discPort=47504, order=5, intOrder=5, loc=false,
> ver=2.18.0#20250116-sha1:00000000, isClient=false], connected=false,
> connectCnt=1, queueLimit=160, reserveCnt=2, pairedConnections=false],
> outRecovery=GridNioRecoveryDescriptor [acked=196, resendCnt=13630,
> rcvCnt=168, sentCnt=13826, reserved=true, lastAck=160, nodeLeft=false,
> node=TcpDiscoveryNode [id=4062535d-f6a8-4ba3-ab95-ace1508ad004,
> consistentId=127.0.0.1:47504, addrs=ArrayList [127.0.0.1], sockAddrs=HashSet
> [/127.0.0.1:47504], discPort=47504, order=5, intOrder=5, loc=false,
> ver=2.18.0#20250116-sha1:00000000, isClient=false], connected=false,
> connectCnt=1, queueLimit=160, reserveCnt=2, pairedConnections=false],
> closeSocket=true,
> outboundMessagesQueueSizeMetric=o.a.i.i.processors.metric.impl.LongAdderMetric@69a257d1,
> super=GridNioSessionImpl [locAddr=/127.0.0.1:48568,
> rmtAddr=/127.0.0.1:47104, createTime=1737011410099, closeTime=0, bytesSent=0,
> bytesRcvd=2398, bytesSent0=0, bytesRcvd0=0, sndSchedTime=1737011410099,
> lastSndTime=1737011410109, lastRcvTime=1737011410109, readsPaused=false,
> filterChain=FilterChain[filters=[GridNioCodecFilter
> [parser=o.a.i.i.util.nio.GridDirectParser@49fbc83f, directMode=true],
> GridConnectionBytesVerifyFilter, SSL filter], accepted=false,
> markedForClose=false]]]
> org.apache.ignite.IgniteException: Invalid message type: 512
> at
> org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl.create(IgniteMessageFactoryImpl.java:104)
> ~[classes/:?]
> at
> org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper$2.create(GridNioServerWrapper.java:811)
> ~[classes/:?]
> at
> org.apache.ignite.internal.util.nio.GridDirectParser.decode(GridDirectParser.java:81)
> ~[classes/:?]
> at
> org.apache.ignite.internal.util.nio.GridNioCodecFilter.onMessageReceived(GridNioCodecFilter.java:113)
> ~[classes/:?]
> at
> org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:109)
> ~[classes/:?]
> at
> org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter.onMessageReceived(GridConnectionBytesVerifyFilter.java:88)
> ~[classes/:?]
> at
> org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:109)
> ~[classes/:?]
> at
> org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter.onMessageReceived(GridNioSslFilter.java:413)
> ~[classes/:?]
> at
> org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:109)
> ~[classes/:?]
> at
> org.apache.ignite.internal.util.nio.GridNioServer$HeadFilter.onMessageReceived(GridNioServer.java:3752)
> ~[classes/:?]
> at
> org.apache.ignite.internal.util.nio.GridNioFilterChain.onMessageReceived(GridNioFilterChain.java:175)
> ~[classes/:?]
> at
> org.apache.ignite.internal.util.nio.GridNioServer$DirectNioClientWorker.processRead(GridNioServer.java:1379)
> ~[classes/:?]
> at
> org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.processSelectedKeysOptimized(GridNioServer.java:2526)
> [classes/:?]
> at
> org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.bodyInternal(GridNioServer.java:2281)
> [classes/:?]
> at
> org.apache.ignite.internal.util.nio.GridNioServer$AbstractNioClientWorker.body(GridNioServer.java:1910)
> [classes/:?]
> at
> org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:125)
> [classes/:?]
> at java.base/java.lang.Thread.run(Thread.java:829) [?:?]
> {code}
> Alternatively you can see NullPointerExceptions, assertions errors and so on.
> In most cases this errors will lead only to message resend and reestablishing
> connections between nodes. But in some cases FH can be invoked with
> subsequent node termination.
> The main reason for the described behavior:
> 1.Node0 tries to establish connection with node1.
> 2. During the handshake, BlockingSslHandler is used to store, encrypt and
> decrypt data received over the network. It stores the decrypted data in a
> dedicated buffer - BlockingSslHandler#appBuf.
> 3. Lets assume node1 sends handshake finish message and some regular
> communication messages in a single network packet.
> 4. Node0 decrypts this packet and writes it to the mentioned above
> BlockingSslHandlerappBuf. So now it stores the bytes related to the
> handshake finished message and some regular messages.
> 5. Node0 finishes handshake and creates GridNioSslHandler to handle
> communication between nodes in NIO way. GridNioSslHandler have a separate set
> of buffers to store, encrypt and decrypt data received over the network.
> 6. After hanshake is finished, node0 tries to decode and handle all bytes
> remaining in BlockingSslHandler#appBuf. See
> org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java:262 and ditches
> the BlockingSslHandler
> 7. If node1 sends only part of the last regular message in step 3, the bytes
> related to this messages will not be precessed during step 6 and will remain
> in BlockingSslHandler#appBuf.
> 8. So when node 1 sends the last part of partially written message from step
> 3. It will be processed on node 0 by GridNioSslHandler from scratch and bytes
> left in BlockingSslHandler#appBuf will be lost.
> This will result in the deserialization errors mentioned above.
> Note, https://issues.apache.org/jira/browse/IGNITE-22375 can greatly increase
> the probability of the described problem.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)