This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a139595609 [improve] improve hazelcast joiner, lite node can't be 
election as master (#8261)
a139595609 is described below

commit a1395956097b7092b65e3e2420d086ef371b344c
Author: Jarvis <[email protected]>
AuthorDate: Fri Dec 13 20:48:45 2024 +0800

    [improve] improve hazelcast joiner, lite node can't be election as master 
(#8261)
---
 .../client/SeaTunnelEngineClusterRoleTest.java     |  57 +++++++
 .../engine/server/SeaTunnelNodeContext.java        |  22 ++-
 .../joiner/LiteNodeDropOutDiscoveryJoiner.java     | 137 +++++++++++++++++
 .../joiner/LiteNodeDropOutMulticastJoiner.java     | 167 +++++++++++++++++++++
 .../{ => joiner}/LiteNodeDropOutTcpIpJoiner.java   |   2 +-
 .../com/hazelcast/cluster/impl/MemberImpl.java     |  15 +-
 6 files changed, 389 insertions(+), 11 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
index 89134ce467..48dfb47476 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
@@ -38,6 +38,7 @@ import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
 
 import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.config.Config;
 import com.hazelcast.instance.impl.HazelcastInstanceImpl;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -306,6 +307,62 @@ public class SeaTunnelEngineClusterRoleTest {
         }
     }
 
+    @Test
+    public void testStartMasterNodeWithTcpIp() {
+        SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
+        HazelcastInstanceImpl instance =
+                
SeaTunnelServerStarter.createMasterHazelcastInstance(seaTunnelConfig);
+        Assertions.assertNotNull(instance);
+        Assertions.assertEquals(1, instance.getCluster().getMembers().size());
+        instance.shutdown();
+    }
+
+    @Test
+    public void testStartMasterNodeWithMulticastJoin() {
+        SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
+        
seaTunnelConfig.setHazelcastConfig(Config.loadFromString(getMulticastConfig()));
+        HazelcastInstanceImpl instance =
+                
SeaTunnelServerStarter.createMasterHazelcastInstance(seaTunnelConfig);
+        Assertions.assertNotNull(instance);
+        Assertions.assertEquals(1, instance.getCluster().getMembers().size());
+        instance.shutdown();
+    }
+
+    @Test
+    public void testCannotOnlyStartWorkerNodeWithTcpIp() {
+        SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
+        Assertions.assertThrows(
+                IllegalStateException.class,
+                () -> {
+                    
SeaTunnelServerStarter.createWorkerHazelcastInstance(seaTunnelConfig);
+                });
+    }
+
+    @Test
+    public void testCannotOnlyStartWorkerNodeWithMulticastJoin() {
+        SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
+        
seaTunnelConfig.setHazelcastConfig(Config.loadFromString(getMulticastConfig()));
+        Assertions.assertThrows(
+                IllegalStateException.class,
+                () -> {
+                    
SeaTunnelServerStarter.createWorkerHazelcastInstance(seaTunnelConfig);
+                });
+    }
+
+    private String getMulticastConfig() {
+        return "hazelcast:\n"
+                + "  network:\n"
+                + "    join:\n"
+                + "      multicast:\n"
+                + "        enabled: true\n"
+                + "        multicast-group: 224.2.2.3\n"
+                + "        multicast-port: 54327\n"
+                + "        multicast-time-to-live: 32\n"
+                + "        multicast-timeout-seconds: 2\n"
+                + "        trusted-interfaces:\n"
+                + "          - 192.168.1.1\n";
+    }
+
     private SeaTunnelClient createSeaTunnelClient(String clusterName) {
         ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
         clientConfig.setClusterName(TestUtils.getClusterName(clusterName));
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
index 60174b8864..3bfd8a41f7 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
@@ -18,16 +18,21 @@
 package org.apache.seatunnel.engine.server;
 
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import 
org.apache.seatunnel.engine.server.joiner.LiteNodeDropOutDiscoveryJoiner;
+import 
org.apache.seatunnel.engine.server.joiner.LiteNodeDropOutMulticastJoiner;
+import org.apache.seatunnel.engine.server.joiner.LiteNodeDropOutTcpIpJoiner;
 
 import com.hazelcast.config.JoinConfig;
 import com.hazelcast.instance.impl.DefaultNodeContext;
 import com.hazelcast.instance.impl.Node;
 import com.hazelcast.instance.impl.NodeExtension;
 import com.hazelcast.internal.cluster.Joiner;
+import com.hazelcast.internal.config.AliasedDiscoveryConfigUtils;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 
 import static com.hazelcast.config.ConfigAccessor.getActiveMemberNetworkConfig;
+import static 
com.hazelcast.spi.properties.ClusterProperty.DISCOVERY_SPI_ENABLED;
 
 @Slf4j
 public class SeaTunnelNodeContext extends DefaultNodeContext {
@@ -45,15 +50,28 @@ public class SeaTunnelNodeContext extends 
DefaultNodeContext {
 
     @Override
     public Joiner createJoiner(Node node) {
+
         JoinConfig join =
                 
getActiveMemberNetworkConfig(seaTunnelConfig.getHazelcastConfig()).getJoin();
         join.verify();
 
-        if (join.getTcpIpConfig().isEnabled()) {
+        // update for seatunnel, lite member can not become master node
+        if (join.getMulticastConfig().isEnabled() && node.multicastService != 
null) {
+            log.info("Using LiteNodeDropOutMulticast Multicast discovery");
+            return new LiteNodeDropOutMulticastJoiner(node);
+        } else if (join.getTcpIpConfig().isEnabled()) {
             log.info("Using LiteNodeDropOutTcpIpJoiner TCP/IP discovery");
             return new LiteNodeDropOutTcpIpJoiner(node);
+        } else if (node.getProperties().getBoolean(DISCOVERY_SPI_ENABLED)
+                || isAnyAliasedConfigEnabled(join)
+                || join.isAutoDetectionEnabled()) {
+            log.info("Using LiteNodeDropOutDiscoveryJoiner Discovery SPI");
+            return new LiteNodeDropOutDiscoveryJoiner(node);
         }
+        return null;
+    }
 
-        return super.createJoiner(node);
+    private boolean isAnyAliasedConfigEnabled(JoinConfig join) {
+        return 
!AliasedDiscoveryConfigUtils.createDiscoveryStrategyConfigs(join).isEmpty();
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/joiner/LiteNodeDropOutDiscoveryJoiner.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/joiner/LiteNodeDropOutDiscoveryJoiner.java
new file mode 100644
index 0000000000..a3ae66f372
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/joiner/LiteNodeDropOutDiscoveryJoiner.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.joiner;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.cluster.impl.MemberImpl;
+import com.hazelcast.config.JoinConfig;
+import com.hazelcast.instance.EndpointQualifier;
+import com.hazelcast.instance.ProtocolType;
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.internal.config.AliasedDiscoveryConfigUtils;
+import com.hazelcast.internal.util.Preconditions;
+import com.hazelcast.internal.util.concurrent.BackoffIdleStrategy;
+import com.hazelcast.internal.util.concurrent.IdleStrategy;
+import com.hazelcast.spi.discovery.DiscoveryNode;
+import com.hazelcast.spi.discovery.integration.DiscoveryService;
+import com.hazelcast.spi.properties.ClusterProperty;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static 
com.hazelcast.internal.config.AliasedDiscoveryConfigUtils.allUsePublicAddress;
+import static 
com.hazelcast.spi.properties.ClusterProperty.DISCOVERY_SPI_PUBLIC_IP_ENABLED;
+
+public class LiteNodeDropOutDiscoveryJoiner extends LiteNodeDropOutTcpIpJoiner 
{
+
+    private final DiscoveryService discoveryService;
+    private final boolean usePublicAddress;
+    private final IdleStrategy idleStrategy;
+    private final int maximumWaitingTimeBeforeJoinSeconds;
+
+    public LiteNodeDropOutDiscoveryJoiner(Node node) {
+        super(node);
+        this.idleStrategy =
+                new BackoffIdleStrategy(
+                        0L,
+                        0L,
+                        TimeUnit.MILLISECONDS.toNanos(10L),
+                        TimeUnit.MILLISECONDS.toNanos(500L));
+        this.maximumWaitingTimeBeforeJoinSeconds =
+                
node.getProperties().getInteger(ClusterProperty.WAIT_SECONDS_BEFORE_JOIN);
+        this.discoveryService = node.discoveryService;
+        this.usePublicAddress = 
usePublicAddress(node.getConfig().getNetworkConfig().getJoin());
+    }
+
+    private boolean usePublicAddress(JoinConfig join) {
+        return node.getProperties().getBoolean(DISCOVERY_SPI_PUBLIC_IP_ENABLED)
+                || allUsePublicAddress(
+                        
AliasedDiscoveryConfigUtils.aliasedDiscoveryConfigsFrom(join));
+    }
+
+    protected Collection<Address> getPossibleAddressesForInitialJoin() {
+        long deadLine =
+                System.nanoTime()
+                        + TimeUnit.SECONDS.toNanos((long) 
this.maximumWaitingTimeBeforeJoinSeconds);
+
+        for (int i = 0; System.nanoTime() < deadLine; ++i) {
+            Collection<Address> possibleAddresses = 
this.getPossibleAddresses();
+            if (!possibleAddresses.isEmpty()) {
+                return possibleAddresses;
+            }
+
+            this.idleStrategy.idle((long) i);
+        }
+
+        return Collections.emptyList();
+    }
+
+    protected Collection<Address> getPossibleAddresses() {
+        Iterable<DiscoveryNode> discoveredNodes =
+                (Iterable)
+                        Preconditions.checkNotNull(
+                                this.discoveryService.discoverNodes(),
+                                "Discovered nodes cannot be null!");
+        MemberImpl localMember = this.node.nodeEngine.getLocalMember();
+        Set<Address> localAddresses = 
this.node.getLocalAddressRegistry().getLocalAddresses();
+        Collection<Address> possibleMembers = new ArrayList();
+        Iterator var5 = discoveredNodes.iterator();
+
+        while (var5.hasNext()) {
+            DiscoveryNode discoveryNode = (DiscoveryNode) var5.next();
+            Address discoveredAddress =
+                    this.usePublicAddress
+                            ? discoveryNode.getPublicAddress()
+                            : discoveryNode.getPrivateAddress();
+            if (localAddresses.contains(discoveredAddress)) {
+                if (!this.usePublicAddress && discoveryNode.getPublicAddress() 
!= null) {
+                    localMember
+                            .getAddressMap()
+                            .put(
+                                    
EndpointQualifier.resolve(ProtocolType.CLIENT, "public"),
+                                    this.publicAddress(localMember, 
discoveryNode));
+                }
+            } else {
+                possibleMembers.add(discoveredAddress);
+            }
+        }
+
+        return possibleMembers;
+    }
+
+    private Address publicAddress(MemberImpl localMember, DiscoveryNode 
discoveryNode) {
+        if (localMember.getAddressMap().containsKey(EndpointQualifier.CLIENT)) 
{
+            try {
+                String publicHost = discoveryNode.getPublicAddress().getHost();
+                int clientPort =
+                        ((Address) 
localMember.getAddressMap().get(EndpointQualifier.CLIENT))
+                                .getPort();
+                return new Address(publicHost, clientPort);
+            } catch (Exception var5) {
+                Exception e = var5;
+                this.logger.fine(e);
+            }
+        }
+
+        return discoveryNode.getPublicAddress();
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/joiner/LiteNodeDropOutMulticastJoiner.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/joiner/LiteNodeDropOutMulticastJoiner.java
new file mode 100644
index 0000000000..47ec2818db
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/joiner/LiteNodeDropOutMulticastJoiner.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.joiner;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.config.ConfigAccessor;
+import com.hazelcast.config.NetworkConfig;
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.internal.cluster.impl.JoinRequest;
+import com.hazelcast.internal.cluster.impl.MulticastJoiner;
+import com.hazelcast.internal.util.Clock;
+import com.hazelcast.internal.util.RandomPicker;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Slf4j
+public class LiteNodeDropOutMulticastJoiner extends MulticastJoiner {
+
+    private static final long JOIN_RETRY_INTERVAL = 1000L;
+    private final AtomicInteger currentTryCount = new AtomicInteger(0);
+    private final AtomicInteger maxTryCount = new 
AtomicInteger(calculateTryCount());
+
+    public LiteNodeDropOutMulticastJoiner(Node node) {
+        super(node);
+    }
+
+    @Override
+    public void doJoin() {
+        long joinStartTime = Clock.currentTimeMillis();
+        long maxJoinMillis = getMaxJoinMillis();
+        Address thisAddress = node.getThisAddress();
+
+        while (shouldRetry() && (Clock.currentTimeMillis() - joinStartTime < 
maxJoinMillis)) {
+
+            // clear master node
+            clusterService.setMasterAddressToJoin(null);
+
+            Address masterAddress = getTargetAddress();
+            if (masterAddress == null) {
+                masterAddress = findMasterWithMulticast();
+            }
+            clusterService.setMasterAddressToJoin(masterAddress);
+
+            if (masterAddress == null || thisAddress.equals(masterAddress)) {
+                if (node.isLiteMember()) {
+                    log.info("This node is lite member. No need to join to a 
master node.");
+                    continue;
+                } else {
+                    clusterJoinManager.setThisMemberAsMaster();
+                    return;
+                }
+            }
+
+            logger.info("Trying to join to discovered node: " + masterAddress);
+            joinMaster();
+        }
+    }
+
+    private void joinMaster() {
+        long maxMasterJoinTime = getMaxJoinTimeToMasterNode();
+        long start = Clock.currentTimeMillis();
+
+        while (shouldRetry() && Clock.currentTimeMillis() - start < 
maxMasterJoinTime) {
+
+            Address master = clusterService.getMasterAddress();
+            if (master != null) {
+                if (logger.isFineEnabled()) {
+                    logger.fine("Joining to master " + master);
+                }
+                clusterJoinManager.sendJoinRequest(master);
+            } else {
+                break;
+            }
+
+            try {
+                clusterService.blockOnJoin(JOIN_RETRY_INTERVAL);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+
+            if (isBlacklisted(master)) {
+                clusterService.setMasterAddressToJoin(null);
+                return;
+            }
+        }
+    }
+
+    private Address findMasterWithMulticast() {
+        try {
+            if (this.logger.isFineEnabled()) {
+                this.logger.fine("Searching for master node. Max tries: " + 
maxTryCount.get());
+            }
+
+            JoinRequest joinRequest = this.node.createJoinRequest((Address) 
null);
+
+            while (this.node.isRunning()
+                    && currentTryCount.incrementAndGet() <= maxTryCount.get()) 
{
+                joinRequest.setTryCount(currentTryCount.get());
+                this.node.multicastService.send(joinRequest);
+                Address masterAddress = this.clusterService.getMasterAddress();
+                if (masterAddress != null) {
+                    Address var3 = masterAddress;
+                    return var3;
+                }
+
+                Thread.sleep((long) this.getPublishInterval());
+            }
+
+            return null;
+        } catch (Exception var7) {
+            Exception e = var7;
+            if (this.logger != null) {
+                this.logger.warning(e);
+            }
+
+            return null;
+        } finally {
+            currentTryCount.set(0);
+        }
+    }
+
+    private int calculateTryCount() {
+        NetworkConfig networkConfig = 
ConfigAccessor.getActiveMemberNetworkConfig(this.config);
+        long timeoutMillis =
+                TimeUnit.SECONDS.toMillis(
+                        (long)
+                                networkConfig
+                                        .getJoin()
+                                        .getMulticastConfig()
+                                        .getMulticastTimeoutSeconds());
+        int avgPublishInterval = 125;
+        int tryCount = (int) timeoutMillis / avgPublishInterval;
+        String host = this.node.getThisAddress().getHost();
+
+        int lastDigits;
+        try {
+            lastDigits = Integer.parseInt(host.substring(host.lastIndexOf(46) 
+ 1));
+        } catch (NumberFormatException var9) {
+            lastDigits = RandomPicker.getInt(512);
+        }
+
+        int portDiff = this.node.getThisAddress().getPort() - 
networkConfig.getPort();
+        tryCount += (lastDigits + portDiff) % 10;
+        return tryCount;
+    }
+
+    private int getPublishInterval() {
+        return RandomPicker.getInt(50, 200);
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/LiteNodeDropOutTcpIpJoiner.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/joiner/LiteNodeDropOutTcpIpJoiner.java
similarity index 99%
rename from 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/LiteNodeDropOutTcpIpJoiner.java
rename to 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/joiner/LiteNodeDropOutTcpIpJoiner.java
index 67aac64aca..afb3eab795 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/LiteNodeDropOutTcpIpJoiner.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/joiner/LiteNodeDropOutTcpIpJoiner.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server;
+package org.apache.seatunnel.engine.server.joiner;
 
 import com.hazelcast.cluster.Address;
 import com.hazelcast.config.Config;
diff --git 
a/seatunnel-shade/seatunnel-hazelcast/seatunnel-hazelcast-shade/src/main/java/com/hazelcast/cluster/impl/MemberImpl.java
 
b/seatunnel-shade/seatunnel-hazelcast/seatunnel-hazelcast-shade/src/main/java/com/hazelcast/cluster/impl/MemberImpl.java
index c5949ee925..ff7ca8e14e 100644
--- 
a/seatunnel-shade/seatunnel-hazelcast/seatunnel-hazelcast-shade/src/main/java/com/hazelcast/cluster/impl/MemberImpl.java
+++ 
b/seatunnel-shade/seatunnel-hazelcast/seatunnel-hazelcast-shade/src/main/java/com/hazelcast/cluster/impl/MemberImpl.java
@@ -286,21 +286,20 @@ public final class MemberImpl extends AbstractMember
         sb.append(":");
         sb.append(address.getPort());
         sb.append(" - ").append(uuid);
-        if (localMember()) {
-            sb.append(" this");
-        }
-
         // update for seatunnel, add worker and master info
         if (isLiteMember()) {
-            sb.append(" worker");
+            sb.append(" [worker node]");
+        } else {
+            sb.append(" [master node]");
         }
-
         if (instance != null
                 && instance.node.getClusterService().getMasterAddress() != null
                 && 
instance.node.getClusterService().getMasterAddress().equals(address)) {
-            sb.append(" master");
+            sb.append(" [active master]");
+        }
+        if (localMember()) {
+            sb.append(" this");
         }
-
         // update for seatunnel, add worker and master info end
         return sb.toString();
     }

Reply via email to