This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 229ef767e08 IGNITE-28267 Remove deprecated usages from 
ScaleCubeDirectMarshallerTransport (#7800)
229ef767e08 is described below

commit 229ef767e08a2ce9a56aa4ca7cb274480244b3bd
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Wed Mar 18 14:15:13 2026 +0200

    IGNITE-28267 Remove deprecated usages from 
ScaleCubeDirectMarshallerTransport (#7800)
---
 .../ScaleCubeDirectMarshallerTransport.java        | 70 ++++++++--------------
 1 file changed, 26 insertions(+), 44 deletions(-)

diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeDirectMarshallerTransport.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeDirectMarshallerTransport.java
index aa6a9430ddb..7222f4c2719 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeDirectMarshallerTransport.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeDirectMarshallerTransport.java
@@ -37,11 +37,9 @@ import org.apache.ignite.network.NetworkAddress;
 import org.jetbrains.annotations.Nullable;
 import reactor.core.Disposable;
 import reactor.core.Disposables;
-import reactor.core.publisher.DirectProcessor;
 import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
-import reactor.core.publisher.MonoProcessor;
+import reactor.core.publisher.Sinks;
 
 /**
  * ScaleCube transport over {@link ConnectionManager}.
@@ -54,16 +52,12 @@ class ScaleCubeDirectMarshallerTransport implements 
Transport {
     static final ChannelType SCALE_CUBE_CHANNEL_TYPE = new ChannelType((short) 
1, "ScaleCube");
 
     /** Message subject. */
-    private final DirectProcessor<Message> subject = DirectProcessor.create();
+    private final Sinks.Many<Message> subject = 
Sinks.many().multicast().directBestEffort();
 
-    /** Message sink. */
-    private final FluxSink<Message> sink = subject.sink();
+    /** Mono representing the state of the stop operation. */
+    private final Mono<Void> stopMono;
 
-    /** Close handler. */
-    private final MonoProcessor<Void> stop = MonoProcessor.create();
-
-    /** On stop. */
-    private final MonoProcessor<Void> onStop = MonoProcessor.create();
+    private volatile boolean isStopped = false;
 
     /** Connection manager. */
     private final MessagingService messagingService;
@@ -72,32 +66,30 @@ class ScaleCubeDirectMarshallerTransport implements 
Transport {
     private final NetworkMessagesFactory messageFactory;
 
     /** Node address. */
-    private final Address address;
+    private final Address localAddress;
 
     /**
      * Constructor.
      *
      * @param localAddress Local address.
      * @param messagingService Messaging service.
-     * @param messageFactory  Message factory.
+     * @param messageFactory Message factory.
      */
     ScaleCubeDirectMarshallerTransport(
             Address localAddress,
             MessagingService messagingService,
             NetworkMessagesFactory messageFactory
     ) {
-        this.address = localAddress;
+        this.localAddress = localAddress;
         this.messagingService = messagingService;
         this.messageFactory = messageFactory;
 
         this.messagingService.addMessageHandler(NetworkMessageTypes.class, 
(message, sender, correlationId) -> onMessage(message));
-        // Setup cleanup
-        stop.then(doStop())
-                .doFinally(s -> onStop.onComplete())
-                .subscribe(
-                        null,
-                        ex -> LOG.warn("Failed to stop [address={}, 
reason={}]", address, ex.toString())
-                );
+
+        stopMono = doStop()
+                .doOnError(ex -> LOG.warn("Failed to stop [address={}].", ex, 
this.localAddress))
+                .doFinally(s -> isStopped = true)
+                .cache();
     }
 
     /**
@@ -107,62 +99,53 @@ class ScaleCubeDirectMarshallerTransport implements 
Transport {
      */
     private Mono<Void> doStop() {
         return Mono.defer(() -> {
-            LOG.info("Stopping [address={}]", address);
+            LOG.info("Stopping ScaleCube transport [address={}]", 
localAddress);
 
-            sink.complete();
+            subject.tryEmitComplete();
+
+            LOG.info("Stopped ScaleCube transport [address={}]", localAddress);
 
-            LOG.info("Stopped [address={}]", address);
             return Mono.empty();
         });
     }
 
-    /** {@inheritDoc} */
     @Override
     public Address address() {
-        return address;
+        return localAddress;
     }
 
-    /** {@inheritDoc} */
     @Override
     public Mono<Transport> start() {
         return Mono.just(this);
     }
 
-    /** {@inheritDoc} */
     @Override
     public Mono<Void> stop() {
-        return Mono.defer(() -> {
-            stop.onComplete();
-            return onStop;
-        });
+        return stopMono;
     }
 
-    /** {@inheritDoc} */
     @Override
     public boolean isStopped() {
-        return onStop.isDisposed();
+        return isStopped;
     }
 
-    /** {@inheritDoc} */
     @Override
     public Mono<Void> send(Address address, Message message) {
         var addr = new NetworkAddress(address.host(), address.port());
 
-        return Mono.fromFuture(() -> {
-            return messagingService.send(addr, SCALE_CUBE_CHANNEL_TYPE, 
fromMessage(message));
-        });
+        return Mono.fromFuture(() -> messagingService.send(addr, 
SCALE_CUBE_CHANNEL_TYPE, fromMessage(message)));
     }
 
     /**
      * Handles new network messages.
      *
-     * @param msg    Network message.
+     * @param msg Network message.
      */
     private void onMessage(NetworkMessage msg) {
         Message message = fromNetworkMessage(msg);
 
         if (message != null) {
-            sink.next(message);
+            subject.tryEmitNext(message);
         }
     }
 
@@ -170,7 +153,7 @@ class ScaleCubeDirectMarshallerTransport implements 
Transport {
      * Wrap ScaleCube {@link Message} with {@link NetworkMessage}.
      *
      * @param message ScaleCube message.
-     * @return Netowork message that wraps ScaleCube message.
+     * @return Network message that wraps ScaleCube message.
      * @throws IgniteInternalException If failed to write message to 
ObjectOutputStream.
      */
     private NetworkMessage fromMessage(Message message) throws 
IgniteInternalException {
@@ -213,10 +196,10 @@ class ScaleCubeDirectMarshallerTransport implements 
Transport {
 
             return Message.withHeaders(headers).data(data).build();
         }
+
         return null;
     }
 
-    /** {@inheritDoc} */
     @Override
     public Mono<Message> requestResponse(Address address, Message request) {
         return Mono.create(sink -> {
@@ -243,9 +226,8 @@ class ScaleCubeDirectMarshallerTransport implements 
Transport {
         });
     }
 
-    /** {@inheritDoc} */
     @Override
     public final Flux<Message> listen() {
-        return subject.onBackpressureBuffer();
+        return subject.asFlux().onBackpressureBuffer();
     }
 }

Reply via email to