This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 87eb5c6f [FLINK-30518] Pass JM Pod IP to Flink when starting up with
Kubernetes HA enabled
87eb5c6f is described below
commit 87eb5c6f1855ef8fc1400039ce2517fc718f2f01
Author: Usamah Jassat <[email protected]>
AuthorDate: Mon Jan 30 15:36:02 2023 +0000
[FLINK-30518] Pass JM Pod IP to Flink when starting up with Kubernetes HA
enabled
---
.../CmdStandaloneJobManagerDecorator.java | 22 +++++++++---
.../StandaloneKubernetesJobManagerParameters.java | 11 ++++++
.../CmdStandaloneJobManagerDecoratorTest.java | 40 ++++++++++++++++++++++
3 files changed, 69 insertions(+), 4 deletions(-)
diff --git
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
index d50a9bc2..3ecea774 100644
---
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
+++
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
@@ -20,6 +20,7 @@ package
org.apache.flink.kubernetes.operator.kubeclient.decorators;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import
org.apache.flink.kubernetes.kubeclient.decorators.AbstractKubernetesStepDecorator;
import
org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
+import org.apache.flink.kubernetes.utils.Constants;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerBuilder;
@@ -36,6 +37,8 @@ public class CmdStandaloneJobManagerDecorator extends
AbstractKubernetesStepDeco
public static final String JOBMANAGER_ENTRYPOINT_ARG = "jobmanager";
public static final String APPLICATION_MODE_ARG = "standalone-job";
+ public static final String POD_IP_ARG =
+ String.format("$(%s)", Constants.ENV_FLINK_POD_IP_ADDRESS);
private final StandaloneKubernetesJobManagerParameters
kubernetesJobManagerParameters;
@@ -56,10 +59,16 @@ public class CmdStandaloneJobManagerDecorator extends
AbstractKubernetesStepDeco
}
private Container decorateSessionContainer(Container mainContainer) {
- return new ContainerBuilder(mainContainer)
-
.withCommand(kubernetesJobManagerParameters.getContainerEntrypoint())
- .withArgs(JOBMANAGER_ENTRYPOINT_ARG)
- .build();
+ ContainerBuilder containerBuilder =
+ new ContainerBuilder(mainContainer)
+
.withCommand(kubernetesJobManagerParameters.getContainerEntrypoint())
+ .addToArgs(JOBMANAGER_ENTRYPOINT_ARG);
+
+ if (kubernetesJobManagerParameters.isKubernetesHA()) {
+ containerBuilder.addToArgs(POD_IP_ARG);
+ }
+
+ return containerBuilder.build();
}
private Container decorateApplicationContainer(Container mainContainer) {
@@ -95,6 +104,11 @@ public class CmdStandaloneJobManagerDecorator extends
AbstractKubernetesStepDeco
args.addAll(kubernetesJobManagerParameters.getJobSpecArgs());
}
+ if (kubernetesJobManagerParameters.isKubernetesHA()) {
+ args.add("--host");
+ args.add(POD_IP_ARG);
+ }
+
return args;
}
}
diff --git
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
index 15a164c8..3b887c0d 100644
---
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
+++
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
@@ -20,6 +20,7 @@ package
org.apache.flink.kubernetes.operator.kubeclient.parameters;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import
org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
@@ -38,6 +39,10 @@ import java.util.Map;
*/
public class StandaloneKubernetesJobManagerParameters extends
KubernetesJobManagerParameters {
+ private static final String KUBERNETES_HA_FQN_FACTORY_CLASS =
+
"org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory";
+ private static final String KUBERNETES_HA_MODE = "KUBERNETES";
+
public StandaloneKubernetesJobManagerParameters(
Configuration flinkConfig, ClusterSpecification
clusterSpecification) {
super(flinkConfig, clusterSpecification);
@@ -107,4 +112,10 @@ public class StandaloneKubernetesJobManagerParameters
extends KubernetesJobManag
}
return null;
}
+
+ public boolean isKubernetesHA() {
+ String haMode = flinkConfig.getValue(HighAvailabilityOptions.HA_MODE);
+ return haMode.equals(KUBERNETES_HA_FQN_FACTORY_CLASS)
+ || haMode.equalsIgnoreCase(KUBERNETES_HA_MODE);
+ }
}
diff --git
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java
index 6ffcd80b..599893b8 100644
---
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java
+++
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java
@@ -20,6 +20,7 @@ package
org.apache.flink.kubernetes.operator.kubeclient.decorators;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import
org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
@@ -105,4 +106,43 @@ public class CmdStandaloneJobManagerDecoratorTest {
decoratedPod.getMainContainer().getArgs(),
containsInAnyOrder(CmdStandaloneJobManagerDecorator.APPLICATION_MODE_ARG));
}
+
+ @Test
+ public void testSessionKubernetesHAArgsAdded() {
+ configuration.set(
+ StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE,
+ StandaloneKubernetesConfigOptionsInternal.ClusterMode.SESSION);
+
+ configuration.set(
+ HighAvailabilityOptions.HA_MODE,
+
"org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory");
+
+ FlinkPod decoratedPod = decorator.decorateFlinkPod(new
FlinkPod.Builder().build());
+
+ assertThat(
+ decoratedPod.getMainContainer().getCommand(),
containsInAnyOrder(MOCK_ENTRYPATH));
+ assertThat(
+ decoratedPod.getMainContainer().getArgs(),
+ containsInAnyOrder(
+
CmdStandaloneJobManagerDecorator.JOBMANAGER_ENTRYPOINT_ARG,
+ CmdStandaloneJobManagerDecorator.POD_IP_ARG));
+ }
+
+ @Test
+ public void testApplicationKubernetesHAArgsAdded() {
+ configuration.set(
+ StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE,
+
StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION);
+
+ configuration.set(HighAvailabilityOptions.HA_MODE, "KUBERNETES");
+
+ FlinkPod decoratedPod = decorator.decorateFlinkPod(new
FlinkPod.Builder().build());
+
+ assertThat(
+ decoratedPod.getMainContainer().getArgs(),
+ containsInAnyOrder(
+ CmdStandaloneJobManagerDecorator.APPLICATION_MODE_ARG,
+ "--host",
+ CmdStandaloneJobManagerDecorator.POD_IP_ARG));
+ }
}