This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new db27ff1ada1 IGNITE-28247 Update ScaleCube to 2.7.7 (#7796)
db27ff1ada1 is described below
commit db27ff1ada1c5d55f0ac924b161c9dded7c3f97c
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Tue Mar 17 10:49:05 2026 +0200
IGNITE-28247 Update ScaleCube to 2.7.7 (#7796)
---
gradle/libs.versions.toml | 2 +-
.../network/scalecube/ScaleCubeClusterService.java | 20 ++++---------
.../ScaleCubeDirectMarshallerTransport.java | 34 +++++++---------------
.../scalecube/ScaleCubeTopologyService.java | 9 ++++--
.../ScaleCubeDirectMarshallerTransportTest.java | 5 ++--
.../scalecube/ScaleCubeTopologyServiceTest.java | 13 ++++-----
6 files changed, 30 insertions(+), 53 deletions(-)
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 9387fb0f589..027ef01bf9e 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -60,7 +60,7 @@ hamcrest = "3.0"
hamcrestOptional = "2.0.0"
hamcrestPath = "1.0.1"
hamcrestJson = "0.3"
-scalecube = "2.6.15"
+scalecube = "2.7.7"
calcite = "1.41.0"
value = "2.12.1"
janino = "3.1.12"
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterService.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterService.java
index 62a4fb881f7..31e92094eb8 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterService.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterService.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.network.scalecube;
import static io.scalecube.cluster.membership.MembershipEvent.createAdded;
+import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Network.ADDRESS_UNRESOLVED_ERR;
@@ -27,12 +28,10 @@ import io.scalecube.cluster.ClusterImpl;
import io.scalecube.cluster.ClusterMessageHandler;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.metadata.MetadataCodec;
-import io.scalecube.net.Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
-import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -40,7 +39,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -291,13 +289,15 @@ public class ScaleCubeClusterService implements
ClusterService {
}
private ClusterImpl createCluster() {
- var transport = new
ScaleCubeDirectMarshallerTransport(parseAddress(localNode.address()),
messagingService, messageFactory);
+ var transport = new
ScaleCubeDirectMarshallerTransport(localNode.address(), messagingService,
messageFactory);
+
+ List<String> seedMembers =
nodeFinder.findNodes().stream().map(NetworkAddress::toString).collect(toList());
ClusterConfig clusterConfig =
clusterConfig(config.membership().value())
.memberId(localNode.id().toString())
.memberAlias(localNode.name())
.transport(opts -> opts.transportFactory(transportConfig ->
transport))
- .membership(opts ->
opts.seedMembers(parseAddresses(nodeFinder.findNodes())))
+ .membership(opts -> opts.seedMembers(seedMembers))
.metadataCodec(METADATA_CODEC);
return new ClusterImpl(clusterConfig)
@@ -351,16 +351,6 @@ public class ScaleCubeClusterService implements
ClusterService {
return new
UserObjectSerializationContext(userObjectDescriptorRegistry,
userObjectDescriptorFactory, userObjectMarshaller);
}
- private static List<Address> parseAddresses(Collection<NetworkAddress>
addresses) {
- return addresses.stream()
- .map(ScaleCubeClusterService::parseAddress)
- .collect(Collectors.toList());
- }
-
- private static Address parseAddress(NetworkAddress address) {
- return Address.create(address.host(), address.port());
- }
-
private void shutdownCluster() {
ClusterImpl cluster = this.cluster;
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeDirectMarshallerTransport.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeDirectMarshallerTransport.java
index aa6a9430ddb..f788fabffd6 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeDirectMarshallerTransport.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeDirectMarshallerTransport.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.network.scalecube;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
-import io.scalecube.net.Address;
import java.util.Map;
import java.util.Objects;
import org.apache.ignite.internal.lang.IgniteInternalException;
@@ -72,7 +71,7 @@ class ScaleCubeDirectMarshallerTransport implements Transport
{
private final NetworkMessagesFactory messageFactory;
/** Node address. */
- private final Address address;
+ private final String localAddress;
/**
* Constructor.
@@ -82,11 +81,11 @@ class ScaleCubeDirectMarshallerTransport implements
Transport {
* @param messageFactory Message factory.
*/
ScaleCubeDirectMarshallerTransport(
- Address localAddress,
+ NetworkAddress localAddress,
MessagingService messagingService,
NetworkMessagesFactory messageFactory
) {
- this.address = localAddress;
+ this.localAddress = localAddress.toString();
this.messagingService = messagingService;
this.messageFactory = messageFactory;
@@ -96,7 +95,7 @@ class ScaleCubeDirectMarshallerTransport implements Transport
{
.doFinally(s -> onStop.onComplete())
.subscribe(
null,
- ex -> LOG.warn("Failed to stop [address={},
reason={}]", address, ex.toString())
+ ex -> LOG.warn("Failed to stop [address={},
reason={}]", this.localAddress, ex.toString())
);
}
@@ -107,28 +106,25 @@ class ScaleCubeDirectMarshallerTransport implements
Transport {
*/
private Mono<Void> doStop() {
return Mono.defer(() -> {
- LOG.info("Stopping [address={}]", address);
+ LOG.info("Stopping [address={}]", localAddress);
sink.complete();
- LOG.info("Stopped [address={}]", address);
+ LOG.info("Stopped [address={}]", localAddress);
return Mono.empty();
});
}
- /** {@inheritDoc} */
@Override
- public Address address() {
- return address;
+ public String address() {
+ return localAddress;
}
- /** {@inheritDoc} */
@Override
public Mono<Transport> start() {
return Mono.just(this);
}
- /** {@inheritDoc} */
@Override
public Mono<Void> stop() {
return Mono.defer(() -> {
@@ -137,20 +133,14 @@ class ScaleCubeDirectMarshallerTransport implements
Transport {
});
}
- /** {@inheritDoc} */
@Override
public boolean isStopped() {
return onStop.isDisposed();
}
- /** {@inheritDoc} */
@Override
- public Mono<Void> send(Address address, Message message) {
- var addr = new NetworkAddress(address.host(), address.port());
-
- return Mono.fromFuture(() -> {
- return messagingService.send(addr, SCALE_CUBE_CHANNEL_TYPE,
fromMessage(message));
- });
+ public Mono<Void> send(String address, Message message) {
+ return Mono.fromFuture(() ->
messagingService.send(NetworkAddress.from(address), SCALE_CUBE_CHANNEL_TYPE,
fromMessage(message)));
}
/**
@@ -216,9 +206,8 @@ class ScaleCubeDirectMarshallerTransport implements
Transport {
return null;
}
- /** {@inheritDoc} */
@Override
- public Mono<Message> requestResponse(Address address, Message request) {
+ public Mono<Message> requestResponse(String address, Message request) {
return Mono.create(sink -> {
Objects.requireNonNull(request, "request must be not null");
Objects.requireNonNull(request.correlationId(), "correlationId
must be not null");
@@ -243,7 +232,6 @@ class ScaleCubeDirectMarshallerTransport implements
Transport {
});
}
- /** {@inheritDoc} */
@Override
public final Flux<Message> listen() {
return subject.onBackpressureBuffer();
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeTopologyService.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeTopologyService.java
index f9949236a6c..5e57a949989 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeTopologyService.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeTopologyService.java
@@ -320,9 +320,12 @@ final class ScaleCubeTopologyService extends
AbstractTopologyService {
* @return Cluster node.
*/
private static InternalClusterNode fromMember(Member member, @Nullable
NodeMetadata nodeMetadata) {
- var addr = new NetworkAddress(member.address().host(),
member.address().port());
-
- return new ClusterNodeImpl(UUID.fromString(member.id()),
member.alias(), addr, nodeMetadata);
+ return new ClusterNodeImpl(
+ UUID.fromString(member.id()),
+ member.alias(),
+ NetworkAddress.from(member.address()),
+ nodeMetadata
+ );
}
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/scalecube/ScaleCubeDirectMarshallerTransportTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/scalecube/ScaleCubeDirectMarshallerTransportTest.java
index 814ea1d315c..3e95895bbce 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/scalecube/ScaleCubeDirectMarshallerTransportTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/scalecube/ScaleCubeDirectMarshallerTransportTest.java
@@ -24,7 +24,6 @@ import static org.mockito.Mockito.any;
import static org.mockito.Mockito.when;
import io.scalecube.cluster.transport.api.Message;
-import io.scalecube.net.Address;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
@@ -47,7 +46,7 @@ class ScaleCubeDirectMarshallerTransportTest extends
BaseIgniteAbstractTest {
@BeforeEach
void createAndStartTransport() {
transport = new ScaleCubeDirectMarshallerTransport(
- Address.create("localhost", 3000),
+ new NetworkAddress("localhost", 3000),
messagingService,
new NetworkMessagesFactory()
);
@@ -66,7 +65,7 @@ class ScaleCubeDirectMarshallerTransportTest extends
BaseIgniteAbstractTest {
void transportSendsByAddress() {
when(messagingService.send(any(NetworkAddress.class), any(),
any())).thenReturn(nullCompletedFuture());
- CompletableFuture<Void> future =
transport.send(Address.create("localhost", 3001),
Message.withData("test").build()).toFuture();
+ CompletableFuture<Void> future = transport.send("localhost:3001",
Message.withData("test").build()).toFuture();
assertThat(future, willCompleteSuccessfully());
}
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/scalecube/ScaleCubeTopologyServiceTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/scalecube/ScaleCubeTopologyServiceTest.java
index b3290cc0e7d..71cdcd854d9 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/scalecube/ScaleCubeTopologyServiceTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/scalecube/ScaleCubeTopologyServiceTest.java
@@ -32,7 +32,6 @@ import static
org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.membership.MembershipEvent;
-import io.scalecube.net.Address;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -50,8 +49,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
class ScaleCubeTopologyServiceTest {
private final ScaleCubeTopologyService topologyService = new
ScaleCubeTopologyService();
- private final Member member1 = new Member(new UUID(0, 1).toString(),
"first", Address.create("host", 1001), "default");
- private final Member member2 = new Member(new UUID(0, 2).toString(),
"second", Address.create("host", 1002), "default");
+ private final Member member1 = new Member(new UUID(0, 1).toString(),
"first", "host:1001", "default");
+ private final Member member2 = new Member(new UUID(0, 2).toString(),
"second", "host:1002", "default");
@Test
void addedEventAddsNodeToTopology() {
@@ -96,7 +95,7 @@ class ScaleCubeTopologyServiceTest {
void getByAddressWorksWithConcurrentModifications(@InjectExecutorService
ExecutorService executor) {
testGetNodeWorksWithConcurrentModifications(
executor,
- member -> topologyService.getByAddress(new
NetworkAddress(member.address().host(), member.address().port()))
+ member ->
topologyService.getByAddress(NetworkAddress.from(member.address()))
);
}
@@ -109,7 +108,7 @@ class ScaleCubeTopologyServiceTest {
@InjectExecutorService ExecutorService executor,
Function<Member, InternalClusterNode> getter
) {
- Member member = new Member(randomUUID().toString(), "test",
Address.create("host", 1001), "default");
+ Member member = new Member(randomUUID().toString(), "test",
"host:1001", "default");
AtomicBoolean proceed = new AtomicBoolean(true);
@@ -146,9 +145,7 @@ class ScaleCubeTopologyServiceTest {
InternalClusterNode firstByConsistentId =
topologyService.getByConsistentId("first");
assertThatMatchesFirstMemberNewVersion(firstByConsistentId,
member1NewVersion);
- InternalClusterNode firstByAddress = topologyService.getByAddress(
- new NetworkAddress(member1.address().host(),
member1.address().port())
- );
+ InternalClusterNode firstByAddress =
topologyService.getByAddress(NetworkAddress.from(member1.address()));
assertThatMatchesFirstMemberNewVersion(firstByAddress,
member1NewVersion);
}