This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a139595609 [improve] improve hazelcast joiner, lite node can't be
election as master (#8261)
a139595609 is described below
commit a1395956097b7092b65e3e2420d086ef371b344c
Author: Jarvis <[email protected]>
AuthorDate: Fri Dec 13 20:48:45 2024 +0800
[improve] improve hazelcast joiner, lite node can't be election as master
(#8261)
---
.../client/SeaTunnelEngineClusterRoleTest.java | 57 +++++++
.../engine/server/SeaTunnelNodeContext.java | 22 ++-
.../joiner/LiteNodeDropOutDiscoveryJoiner.java | 137 +++++++++++++++++
.../joiner/LiteNodeDropOutMulticastJoiner.java | 167 +++++++++++++++++++++
.../{ => joiner}/LiteNodeDropOutTcpIpJoiner.java | 2 +-
.../com/hazelcast/cluster/impl/MemberImpl.java | 15 +-
6 files changed, 389 insertions(+), 11 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
index 89134ce467..48dfb47476 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
@@ -38,6 +38,7 @@ import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.config.Config;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -306,6 +307,62 @@ public class SeaTunnelEngineClusterRoleTest {
}
}
+ @Test
+ public void testStartMasterNodeWithTcpIp() {
+ SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
+ HazelcastInstanceImpl instance =
+
SeaTunnelServerStarter.createMasterHazelcastInstance(seaTunnelConfig);
+ Assertions.assertNotNull(instance);
+ Assertions.assertEquals(1, instance.getCluster().getMembers().size());
+ instance.shutdown();
+ }
+
+ @Test
+ public void testStartMasterNodeWithMulticastJoin() {
+ SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
+
seaTunnelConfig.setHazelcastConfig(Config.loadFromString(getMulticastConfig()));
+ HazelcastInstanceImpl instance =
+
SeaTunnelServerStarter.createMasterHazelcastInstance(seaTunnelConfig);
+ Assertions.assertNotNull(instance);
+ Assertions.assertEquals(1, instance.getCluster().getMembers().size());
+ instance.shutdown();
+ }
+
+ @Test
+ public void testCannotOnlyStartWorkerNodeWithTcpIp() {
+ SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
+ Assertions.assertThrows(
+ IllegalStateException.class,
+ () -> {
+
SeaTunnelServerStarter.createWorkerHazelcastInstance(seaTunnelConfig);
+ });
+ }
+
+ @Test
+ public void testCannotOnlyStartWorkerNodeWithMulticastJoin() {
+ SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
+
seaTunnelConfig.setHazelcastConfig(Config.loadFromString(getMulticastConfig()));
+ Assertions.assertThrows(
+ IllegalStateException.class,
+ () -> {
+
SeaTunnelServerStarter.createWorkerHazelcastInstance(seaTunnelConfig);
+ });
+ }
+
+ private String getMulticastConfig() {
+ return "hazelcast:\n"
+ + " network:\n"
+ + " join:\n"
+ + " multicast:\n"
+ + " enabled: true\n"
+ + " multicast-group: 224.2.2.3\n"
+ + " multicast-port: 54327\n"
+ + " multicast-time-to-live: 32\n"
+ + " multicast-timeout-seconds: 2\n"
+ + " trusted-interfaces:\n"
+ + " - 192.168.1.1\n";
+ }
+
private SeaTunnelClient createSeaTunnelClient(String clusterName) {
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(clusterName));
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
index 60174b8864..3bfd8a41f7 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
@@ -18,16 +18,21 @@
package org.apache.seatunnel.engine.server;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import
org.apache.seatunnel.engine.server.joiner.LiteNodeDropOutDiscoveryJoiner;
+import
org.apache.seatunnel.engine.server.joiner.LiteNodeDropOutMulticastJoiner;
+import org.apache.seatunnel.engine.server.joiner.LiteNodeDropOutTcpIpJoiner;
import com.hazelcast.config.JoinConfig;
import com.hazelcast.instance.impl.DefaultNodeContext;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.instance.impl.NodeExtension;
import com.hazelcast.internal.cluster.Joiner;
+import com.hazelcast.internal.config.AliasedDiscoveryConfigUtils;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import static com.hazelcast.config.ConfigAccessor.getActiveMemberNetworkConfig;
+import static
com.hazelcast.spi.properties.ClusterProperty.DISCOVERY_SPI_ENABLED;
@Slf4j
public class SeaTunnelNodeContext extends DefaultNodeContext {
@@ -45,15 +50,28 @@ public class SeaTunnelNodeContext extends
DefaultNodeContext {
@Override
public Joiner createJoiner(Node node) {
+
JoinConfig join =
getActiveMemberNetworkConfig(seaTunnelConfig.getHazelcastConfig()).getJoin();
join.verify();
- if (join.getTcpIpConfig().isEnabled()) {
+ // update for seatunnel, lite member can not become master node
+ if (join.getMulticastConfig().isEnabled() && node.multicastService !=
null) {
+ log.info("Using LiteNodeDropOutMulticast Multicast discovery");
+ return new LiteNodeDropOutMulticastJoiner(node);
+ } else if (join.getTcpIpConfig().isEnabled()) {
log.info("Using LiteNodeDropOutTcpIpJoiner TCP/IP discovery");
return new LiteNodeDropOutTcpIpJoiner(node);
+ } else if (node.getProperties().getBoolean(DISCOVERY_SPI_ENABLED)
+ || isAnyAliasedConfigEnabled(join)
+ || join.isAutoDetectionEnabled()) {
+ log.info("Using LiteNodeDropOutDiscoveryJoiner Discovery SPI");
+ return new LiteNodeDropOutDiscoveryJoiner(node);
}
+ return null;
+ }
- return super.createJoiner(node);
+ private boolean isAnyAliasedConfigEnabled(JoinConfig join) {
+ return
!AliasedDiscoveryConfigUtils.createDiscoveryStrategyConfigs(join).isEmpty();
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/joiner/LiteNodeDropOutDiscoveryJoiner.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/joiner/LiteNodeDropOutDiscoveryJoiner.java
new file mode 100644
index 0000000000..a3ae66f372
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/joiner/LiteNodeDropOutDiscoveryJoiner.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.joiner;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.cluster.impl.MemberImpl;
+import com.hazelcast.config.JoinConfig;
+import com.hazelcast.instance.EndpointQualifier;
+import com.hazelcast.instance.ProtocolType;
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.internal.config.AliasedDiscoveryConfigUtils;
+import com.hazelcast.internal.util.Preconditions;
+import com.hazelcast.internal.util.concurrent.BackoffIdleStrategy;
+import com.hazelcast.internal.util.concurrent.IdleStrategy;
+import com.hazelcast.spi.discovery.DiscoveryNode;
+import com.hazelcast.spi.discovery.integration.DiscoveryService;
+import com.hazelcast.spi.properties.ClusterProperty;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static
com.hazelcast.internal.config.AliasedDiscoveryConfigUtils.allUsePublicAddress;
+import static
com.hazelcast.spi.properties.ClusterProperty.DISCOVERY_SPI_PUBLIC_IP_ENABLED;
+
+public class LiteNodeDropOutDiscoveryJoiner extends LiteNodeDropOutTcpIpJoiner
{
+
+ private final DiscoveryService discoveryService;
+ private final boolean usePublicAddress;
+ private final IdleStrategy idleStrategy;
+ private final int maximumWaitingTimeBeforeJoinSeconds;
+
+ public LiteNodeDropOutDiscoveryJoiner(Node node) {
+ super(node);
+ this.idleStrategy =
+ new BackoffIdleStrategy(
+ 0L,
+ 0L,
+ TimeUnit.MILLISECONDS.toNanos(10L),
+ TimeUnit.MILLISECONDS.toNanos(500L));
+ this.maximumWaitingTimeBeforeJoinSeconds =
+
node.getProperties().getInteger(ClusterProperty.WAIT_SECONDS_BEFORE_JOIN);
+ this.discoveryService = node.discoveryService;
+ this.usePublicAddress =
usePublicAddress(node.getConfig().getNetworkConfig().getJoin());
+ }
+
+ private boolean usePublicAddress(JoinConfig join) {
+ return node.getProperties().getBoolean(DISCOVERY_SPI_PUBLIC_IP_ENABLED)
+ || allUsePublicAddress(
+
AliasedDiscoveryConfigUtils.aliasedDiscoveryConfigsFrom(join));
+ }
+
+ protected Collection<Address> getPossibleAddressesForInitialJoin() {
+ long deadLine =
+ System.nanoTime()
+ + TimeUnit.SECONDS.toNanos((long)
this.maximumWaitingTimeBeforeJoinSeconds);
+
+ for (int i = 0; System.nanoTime() < deadLine; ++i) {
+ Collection<Address> possibleAddresses =
this.getPossibleAddresses();
+ if (!possibleAddresses.isEmpty()) {
+ return possibleAddresses;
+ }
+
+ this.idleStrategy.idle((long) i);
+ }
+
+ return Collections.emptyList();
+ }
+
+ protected Collection<Address> getPossibleAddresses() {
+ Iterable<DiscoveryNode> discoveredNodes =
+ (Iterable)
+ Preconditions.checkNotNull(
+ this.discoveryService.discoverNodes(),
+ "Discovered nodes cannot be null!");
+ MemberImpl localMember = this.node.nodeEngine.getLocalMember();
+ Set<Address> localAddresses =
this.node.getLocalAddressRegistry().getLocalAddresses();
+ Collection<Address> possibleMembers = new ArrayList();
+ Iterator var5 = discoveredNodes.iterator();
+
+ while (var5.hasNext()) {
+ DiscoveryNode discoveryNode = (DiscoveryNode) var5.next();
+ Address discoveredAddress =
+ this.usePublicAddress
+ ? discoveryNode.getPublicAddress()
+ : discoveryNode.getPrivateAddress();
+ if (localAddresses.contains(discoveredAddress)) {
+ if (!this.usePublicAddress && discoveryNode.getPublicAddress()
!= null) {
+ localMember
+ .getAddressMap()
+ .put(
+
EndpointQualifier.resolve(ProtocolType.CLIENT, "public"),
+ this.publicAddress(localMember,
discoveryNode));
+ }
+ } else {
+ possibleMembers.add(discoveredAddress);
+ }
+ }
+
+ return possibleMembers;
+ }
+
+ private Address publicAddress(MemberImpl localMember, DiscoveryNode
discoveryNode) {
+ if (localMember.getAddressMap().containsKey(EndpointQualifier.CLIENT))
{
+ try {
+ String publicHost = discoveryNode.getPublicAddress().getHost();
+ int clientPort =
+ ((Address)
localMember.getAddressMap().get(EndpointQualifier.CLIENT))
+ .getPort();
+ return new Address(publicHost, clientPort);
+ } catch (Exception var5) {
+ Exception e = var5;
+ this.logger.fine(e);
+ }
+ }
+
+ return discoveryNode.getPublicAddress();
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/joiner/LiteNodeDropOutMulticastJoiner.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/joiner/LiteNodeDropOutMulticastJoiner.java
new file mode 100644
index 0000000000..47ec2818db
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/joiner/LiteNodeDropOutMulticastJoiner.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.joiner;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.config.ConfigAccessor;
+import com.hazelcast.config.NetworkConfig;
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.internal.cluster.impl.JoinRequest;
+import com.hazelcast.internal.cluster.impl.MulticastJoiner;
+import com.hazelcast.internal.util.Clock;
+import com.hazelcast.internal.util.RandomPicker;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Slf4j
+public class LiteNodeDropOutMulticastJoiner extends MulticastJoiner {
+
+ private static final long JOIN_RETRY_INTERVAL = 1000L;
+ private final AtomicInteger currentTryCount = new AtomicInteger(0);
+ private final AtomicInteger maxTryCount = new
AtomicInteger(calculateTryCount());
+
+ public LiteNodeDropOutMulticastJoiner(Node node) {
+ super(node);
+ }
+
+ @Override
+ public void doJoin() {
+ long joinStartTime = Clock.currentTimeMillis();
+ long maxJoinMillis = getMaxJoinMillis();
+ Address thisAddress = node.getThisAddress();
+
+ while (shouldRetry() && (Clock.currentTimeMillis() - joinStartTime <
maxJoinMillis)) {
+
+ // clear master node
+ clusterService.setMasterAddressToJoin(null);
+
+ Address masterAddress = getTargetAddress();
+ if (masterAddress == null) {
+ masterAddress = findMasterWithMulticast();
+ }
+ clusterService.setMasterAddressToJoin(masterAddress);
+
+ if (masterAddress == null || thisAddress.equals(masterAddress)) {
+ if (node.isLiteMember()) {
+ log.info("This node is lite member. No need to join to a
master node.");
+ continue;
+ } else {
+ clusterJoinManager.setThisMemberAsMaster();
+ return;
+ }
+ }
+
+ logger.info("Trying to join to discovered node: " + masterAddress);
+ joinMaster();
+ }
+ }
+
+ private void joinMaster() {
+ long maxMasterJoinTime = getMaxJoinTimeToMasterNode();
+ long start = Clock.currentTimeMillis();
+
+ while (shouldRetry() && Clock.currentTimeMillis() - start <
maxMasterJoinTime) {
+
+ Address master = clusterService.getMasterAddress();
+ if (master != null) {
+ if (logger.isFineEnabled()) {
+ logger.fine("Joining to master " + master);
+ }
+ clusterJoinManager.sendJoinRequest(master);
+ } else {
+ break;
+ }
+
+ try {
+ clusterService.blockOnJoin(JOIN_RETRY_INTERVAL);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ if (isBlacklisted(master)) {
+ clusterService.setMasterAddressToJoin(null);
+ return;
+ }
+ }
+ }
+
+ private Address findMasterWithMulticast() {
+ try {
+ if (this.logger.isFineEnabled()) {
+ this.logger.fine("Searching for master node. Max tries: " +
maxTryCount.get());
+ }
+
+ JoinRequest joinRequest = this.node.createJoinRequest((Address)
null);
+
+ while (this.node.isRunning()
+ && currentTryCount.incrementAndGet() <= maxTryCount.get())
{
+ joinRequest.setTryCount(currentTryCount.get());
+ this.node.multicastService.send(joinRequest);
+ Address masterAddress = this.clusterService.getMasterAddress();
+ if (masterAddress != null) {
+ Address var3 = masterAddress;
+ return var3;
+ }
+
+ Thread.sleep((long) this.getPublishInterval());
+ }
+
+ return null;
+ } catch (Exception var7) {
+ Exception e = var7;
+ if (this.logger != null) {
+ this.logger.warning(e);
+ }
+
+ return null;
+ } finally {
+ currentTryCount.set(0);
+ }
+ }
+
+ private int calculateTryCount() {
+ NetworkConfig networkConfig =
ConfigAccessor.getActiveMemberNetworkConfig(this.config);
+ long timeoutMillis =
+ TimeUnit.SECONDS.toMillis(
+ (long)
+ networkConfig
+ .getJoin()
+ .getMulticastConfig()
+ .getMulticastTimeoutSeconds());
+ int avgPublishInterval = 125;
+ int tryCount = (int) timeoutMillis / avgPublishInterval;
+ String host = this.node.getThisAddress().getHost();
+
+ int lastDigits;
+ try {
+ lastDigits = Integer.parseInt(host.substring(host.lastIndexOf(46)
+ 1));
+ } catch (NumberFormatException var9) {
+ lastDigits = RandomPicker.getInt(512);
+ }
+
+ int portDiff = this.node.getThisAddress().getPort() -
networkConfig.getPort();
+ tryCount += (lastDigits + portDiff) % 10;
+ return tryCount;
+ }
+
+ private int getPublishInterval() {
+ return RandomPicker.getInt(50, 200);
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/LiteNodeDropOutTcpIpJoiner.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/joiner/LiteNodeDropOutTcpIpJoiner.java
similarity index 99%
rename from
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/LiteNodeDropOutTcpIpJoiner.java
rename to
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/joiner/LiteNodeDropOutTcpIpJoiner.java
index 67aac64aca..afb3eab795 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/LiteNodeDropOutTcpIpJoiner.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/joiner/LiteNodeDropOutTcpIpJoiner.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server;
+package org.apache.seatunnel.engine.server.joiner;
import com.hazelcast.cluster.Address;
import com.hazelcast.config.Config;
diff --git
a/seatunnel-shade/seatunnel-hazelcast/seatunnel-hazelcast-shade/src/main/java/com/hazelcast/cluster/impl/MemberImpl.java
b/seatunnel-shade/seatunnel-hazelcast/seatunnel-hazelcast-shade/src/main/java/com/hazelcast/cluster/impl/MemberImpl.java
index c5949ee925..ff7ca8e14e 100644
---
a/seatunnel-shade/seatunnel-hazelcast/seatunnel-hazelcast-shade/src/main/java/com/hazelcast/cluster/impl/MemberImpl.java
+++
b/seatunnel-shade/seatunnel-hazelcast/seatunnel-hazelcast-shade/src/main/java/com/hazelcast/cluster/impl/MemberImpl.java
@@ -286,21 +286,20 @@ public final class MemberImpl extends AbstractMember
sb.append(":");
sb.append(address.getPort());
sb.append(" - ").append(uuid);
- if (localMember()) {
- sb.append(" this");
- }
-
// update for seatunnel, add worker and master info
if (isLiteMember()) {
- sb.append(" worker");
+ sb.append(" [worker node]");
+ } else {
+ sb.append(" [master node]");
}
-
if (instance != null
&& instance.node.getClusterService().getMasterAddress() != null
&&
instance.node.getClusterService().getMasterAddress().equals(address)) {
- sb.append(" master");
+ sb.append(" [active master]");
+ }
+ if (localMember()) {
+ sb.append(" this");
}
-
// update for seatunnel, add worker and master info end
return sb.toString();
}