This is an automated email from the ASF dual-hosted git repository.
jarvis pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new dff3ef4656 [Fix][Zeta] If Zeta not a TCP discovery, it cannot find
other members (#7757)
dff3ef4656 is described below
commit dff3ef4656b81ab74fb24b131035a2a32735c825
Author: Dongyeon Lee <[email protected]>
AuthorDate: Sat Sep 28 10:00:41 2024 +0900
[Fix][Zeta] If Zeta not a TCP discovery, it cannot find other members
(#7757)
---
.../seatunnel/engine/e2e/k8s/KubernetesIT.java | 20 +++++++++++++++----
...st.yaml => hazelcast-kubernetes-discovery.yaml} | 11 ++++++-----
...hazelcast.yaml => hazelcast-tcp-discovery.yaml} | 0
.../engine/server/SeaTunnelNodeContext.java | 23 ++--------------------
4 files changed, 24 insertions(+), 30 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/java/org/apache/seatunnel/engine/e2e/k8s/KubernetesIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/java/org/apache/seatunnel/engine/e2e/k8s/KubernetesIT.java
index 2de7caeddb..ce2b73fb10 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/java/org/apache/seatunnel/engine/e2e/k8s/KubernetesIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/java/org/apache/seatunnel/engine/e2e/k8s/KubernetesIT.java
@@ -61,7 +61,18 @@ public class KubernetesIT {
private static final String podName = "seatunnel-0";
@Test
- public void test()
+ public void testTcpDiscovery()
+ throws IOException, XmlPullParserException, ApiException,
InterruptedException {
+ runDiscoveryTest("hazelcast-tcp-discovery.yaml");
+ }
+
+ @Test
+ public void testKubernetesDiscovery()
+ throws IOException, XmlPullParserException, ApiException,
InterruptedException {
+ runDiscoveryTest("hazelcast-kubernetes-discovery.yaml");
+ }
+
+ private void runDiscoveryTest(String hazelCastConfigFile)
throws IOException, XmlPullParserException, ApiException,
InterruptedException {
ApiClient client = Config.defaultClient();
AppsV1Api appsV1Api = new AppsV1Api(client);
@@ -82,7 +93,7 @@ public class KubernetesIT {
log.info("Docker's environmental information");
log.info(info.toString());
if
(dockerClient.listImagesCmd().withImageNameFilter(tag).exec().isEmpty()) {
- copyFileToCurrentResources(targetPath);
+ copyFileToCurrentResources(hazelCastConfigFile, targetPath);
File file =
new File(
PROJECT_ROOT_PATH
@@ -153,7 +164,8 @@ public class KubernetesIT {
}
}
- private void copyFileToCurrentResources(String targetPath) throws
IOException {
+ private void copyFileToCurrentResources(String hazelCastConfigFile, String
targetPath)
+ throws IOException {
File jarsPath = new File(targetPath + "/jars");
jarsPath.mkdirs();
File binPath = new File(targetPath + "/bin");
@@ -164,7 +176,7 @@ public class KubernetesIT {
new File(PROJECT_ROOT_PATH + "/config"), new File(targetPath +
"/config"));
// replace hazelcast.yaml and hazelcast-client.yaml
Files.copy(
- Paths.get(targetPath + "/custom_config/hazelcast.yaml"),
+ Paths.get(targetPath + "/custom_config/" +
hazelCastConfigFile),
Paths.get(targetPath + "/config/hazelcast.yaml"),
StandardCopyOption.REPLACE_EXISTING);
Files.copy(
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast.yaml
b/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast-kubernetes-discovery.yaml
similarity index 89%
copy from
seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast.yaml
copy to
seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast-kubernetes-discovery.yaml
index 10992ae39f..d4a79cd0e4 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast.yaml
+++
b/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast-kubernetes-discovery.yaml
@@ -26,12 +26,13 @@ hazelcast:
DATA:
enabled: true
join:
- tcp-ip:
+ multicast:
+ enabled: false
+ kubernetes:
enabled: true
- member-list:
- - seatunnel-0.seatunnel.default.svc.cluster.local
- - seatunnel-1.seatunnel.default.svc.cluster.local
-
+ service-port: 5801
+ namespace: default
+ service-name: seatunnel
port:
auto-increment: true
port-count: 100
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast.yaml
b/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast-tcp-discovery.yaml
similarity index 100%
rename from
seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast.yaml
rename to
seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast-tcp-discovery.yaml
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
index 8ea6b3cf5b..60174b8864 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
@@ -24,14 +24,10 @@ import com.hazelcast.instance.impl.DefaultNodeContext;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.instance.impl.NodeExtension;
import com.hazelcast.internal.cluster.Joiner;
-import com.hazelcast.internal.config.AliasedDiscoveryConfigUtils;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import static com.hazelcast.config.ConfigAccessor.getActiveMemberNetworkConfig;
-import static
com.hazelcast.internal.config.AliasedDiscoveryConfigUtils.allUsePublicAddress;
-import static
com.hazelcast.spi.properties.ClusterProperty.DISCOVERY_SPI_ENABLED;
-import static
com.hazelcast.spi.properties.ClusterProperty.DISCOVERY_SPI_PUBLIC_IP_ENABLED;
@Slf4j
public class SeaTunnelNodeContext extends DefaultNodeContext {
@@ -53,26 +49,11 @@ public class SeaTunnelNodeContext extends
DefaultNodeContext {
getActiveMemberNetworkConfig(seaTunnelConfig.getHazelcastConfig()).getJoin();
join.verify();
- if (node.shouldUseMulticastJoiner(join) && node.multicastService !=
null) {
- super.createJoiner(node);
- } else if (join.getTcpIpConfig().isEnabled()) {
+ if (join.getTcpIpConfig().isEnabled()) {
log.info("Using LiteNodeDropOutTcpIpJoiner TCP/IP discovery");
return new LiteNodeDropOutTcpIpJoiner(node);
- } else if (node.getProperties().getBoolean(DISCOVERY_SPI_ENABLED)
- || isAnyAliasedConfigEnabled(join)
- || join.isAutoDetectionEnabled()) {
- super.createJoiner(node);
}
- return null;
- }
-
- private static boolean isAnyAliasedConfigEnabled(JoinConfig join) {
- return
!AliasedDiscoveryConfigUtils.createDiscoveryStrategyConfigs(join).isEmpty();
- }
- private boolean usePublicAddress(JoinConfig join, Node node) {
- return node.getProperties().getBoolean(DISCOVERY_SPI_PUBLIC_IP_ENABLED)
- || allUsePublicAddress(
-
AliasedDiscoveryConfigUtils.aliasedDiscoveryConfigsFrom(join));
+ return super.createJoiner(node);
}
}