This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 229ef767e08 IGNITE-28267 Remove deprecated usages from
ScaleCubeDirectMarshallerTransport (#7800)
229ef767e08 is described below
commit 229ef767e08a2ce9a56aa4ca7cb274480244b3bd
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Wed Mar 18 14:15:13 2026 +0200
IGNITE-28267 Remove deprecated usages from
ScaleCubeDirectMarshallerTransport (#7800)
---
.../ScaleCubeDirectMarshallerTransport.java | 70 ++++++++--------------
1 file changed, 26 insertions(+), 44 deletions(-)
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeDirectMarshallerTransport.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeDirectMarshallerTransport.java
index aa6a9430ddb..7222f4c2719 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeDirectMarshallerTransport.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeDirectMarshallerTransport.java
@@ -37,11 +37,9 @@ import org.apache.ignite.network.NetworkAddress;
import org.jetbrains.annotations.Nullable;
import reactor.core.Disposable;
import reactor.core.Disposables;
-import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
-import reactor.core.publisher.MonoProcessor;
+import reactor.core.publisher.Sinks;
/**
* ScaleCube transport over {@link ConnectionManager}.
@@ -54,16 +52,12 @@ class ScaleCubeDirectMarshallerTransport implements
Transport {
static final ChannelType SCALE_CUBE_CHANNEL_TYPE = new ChannelType((short)
1, "ScaleCube");
/** Message subject. */
- private final DirectProcessor<Message> subject = DirectProcessor.create();
+ private final Sinks.Many<Message> subject =
Sinks.many().multicast().directBestEffort();
- /** Message sink. */
- private final FluxSink<Message> sink = subject.sink();
+ /** Mono representing the state of the stop operation. */
+ private final Mono<Void> stopMono;
- /** Close handler. */
- private final MonoProcessor<Void> stop = MonoProcessor.create();
-
- /** On stop. */
- private final MonoProcessor<Void> onStop = MonoProcessor.create();
+ private volatile boolean isStopped = false;
/** Connection manager. */
private final MessagingService messagingService;
@@ -72,32 +66,30 @@ class ScaleCubeDirectMarshallerTransport implements
Transport {
private final NetworkMessagesFactory messageFactory;
/** Node address. */
- private final Address address;
+ private final Address localAddress;
/**
* Constructor.
*
* @param localAddress Local address.
* @param messagingService Messaging service.
- * @param messageFactory Message factory.
+ * @param messageFactory Message factory.
*/
ScaleCubeDirectMarshallerTransport(
Address localAddress,
MessagingService messagingService,
NetworkMessagesFactory messageFactory
) {
- this.address = localAddress;
+ this.localAddress = localAddress;
this.messagingService = messagingService;
this.messageFactory = messageFactory;
this.messagingService.addMessageHandler(NetworkMessageTypes.class,
(message, sender, correlationId) -> onMessage(message));
- // Setup cleanup
- stop.then(doStop())
- .doFinally(s -> onStop.onComplete())
- .subscribe(
- null,
- ex -> LOG.warn("Failed to stop [address={},
reason={}]", address, ex.toString())
- );
+
+ stopMono = doStop()
+ .doOnError(ex -> LOG.warn("Failed to stop [address={}].", ex,
this.localAddress))
+ .doFinally(s -> isStopped = true)
+ .cache();
}
/**
@@ -107,62 +99,53 @@ class ScaleCubeDirectMarshallerTransport implements
Transport {
*/
private Mono<Void> doStop() {
return Mono.defer(() -> {
- LOG.info("Stopping [address={}]", address);
+ LOG.info("Stopping ScaleCube transport [address={}]",
localAddress);
- sink.complete();
+ subject.tryEmitComplete();
+
+ LOG.info("Stopped ScaleCube transport [address={}]", localAddress);
- LOG.info("Stopped [address={}]", address);
return Mono.empty();
});
}
- /** {@inheritDoc} */
@Override
public Address address() {
- return address;
+ return localAddress;
}
- /** {@inheritDoc} */
@Override
public Mono<Transport> start() {
return Mono.just(this);
}
- /** {@inheritDoc} */
@Override
public Mono<Void> stop() {
- return Mono.defer(() -> {
- stop.onComplete();
- return onStop;
- });
+ return stopMono;
}
- /** {@inheritDoc} */
@Override
public boolean isStopped() {
- return onStop.isDisposed();
+ return isStopped;
}
- /** {@inheritDoc} */
@Override
public Mono<Void> send(Address address, Message message) {
var addr = new NetworkAddress(address.host(), address.port());
- return Mono.fromFuture(() -> {
- return messagingService.send(addr, SCALE_CUBE_CHANNEL_TYPE,
fromMessage(message));
- });
+ return Mono.fromFuture(() -> messagingService.send(addr,
SCALE_CUBE_CHANNEL_TYPE, fromMessage(message)));
}
/**
* Handles new network messages.
*
- * @param msg Network message.
+ * @param msg Network message.
*/
private void onMessage(NetworkMessage msg) {
Message message = fromNetworkMessage(msg);
if (message != null) {
- sink.next(message);
+ subject.tryEmitNext(message);
}
}
@@ -170,7 +153,7 @@ class ScaleCubeDirectMarshallerTransport implements
Transport {
* Wrap ScaleCube {@link Message} with {@link NetworkMessage}.
*
* @param message ScaleCube message.
- * @return Netowork message that wraps ScaleCube message.
+ * @return Network message that wraps ScaleCube message.
* @throws IgniteInternalException If failed to write message to
ObjectOutputStream.
*/
private NetworkMessage fromMessage(Message message) throws
IgniteInternalException {
@@ -213,10 +196,10 @@ class ScaleCubeDirectMarshallerTransport implements
Transport {
return Message.withHeaders(headers).data(data).build();
}
+
return null;
}
- /** {@inheritDoc} */
@Override
public Mono<Message> requestResponse(Address address, Message request) {
return Mono.create(sink -> {
@@ -243,9 +226,8 @@ class ScaleCubeDirectMarshallerTransport implements
Transport {
});
}
- /** {@inheritDoc} */
@Override
public final Flux<Message> listen() {
- return subject.onBackpressureBuffer();
+ return subject.asFlux().onBackpressureBuffer();
}
}