This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit ca848f9da9de9f075b0e12e00b057d6c8a85b771 Author: Usamah Jassat <[email protected]> AuthorDate: Tue Apr 26 18:41:41 2022 +0100 [FLINK-27443] Add k8s parameters and decorators for standalone JM and TM pods --- flink-kubernetes-shaded/pom.xml | 22 +++ .../pom.xml | 80 +++++----- .../CmdStandaloneJobManagerDecorator.java | 84 ++++++++++ .../CmdStandaloneTaskManagerDecorator.java | 51 ++++++ .../InitStandaloneTaskManagerDecorator.java | 145 +++++++++++++++++ .../decorators/UserLibMountDecorator.java | 81 ++++++++++ .../StandaloneKubernetesJobManagerParameters.java | 83 ++++++++++ .../StandaloneKubernetesTaskManagerParameters.java | 138 ++++++++++++++++ .../StandaloneKubernetesConfigOptionsInternal.java | 48 ++++++ .../operator/utils/StandaloneKubernetesUtils.java | 64 ++++++++ .../CmdStandaloneJobManagerDecoratorTest.java | 82 ++++++++++ .../CmdStandaloneTaskManagerDecoratorTest.java | 64 ++++++++ .../InitStandaloneTaskManagerDecoratorTest.java | 176 +++++++++++++++++++++ .../decorators/UserLibMountDecoratorTest.java | 80 ++++++++++ .../kubeclient/parameters/ParametersTestBase.java | 145 +++++++++++++++++ ...andaloneKubernetesJobManagerParametersTest.java | 105 ++++++++++++ ...ndaloneKubernetesTaskManagerParametersTest.java | 159 +++++++++++++++++++ .../operator/kubeclient/utils/TestUtils.java | 92 +++++++++++ pom.xml | 6 +- 19 files changed, 1667 insertions(+), 38 deletions(-) diff --git a/flink-kubernetes-shaded/pom.xml b/flink-kubernetes-shaded/pom.xml index 7fb94ef1..543043b3 100644 --- a/flink-kubernetes-shaded/pom.xml +++ b/flink-kubernetes-shaded/pom.xml @@ -43,6 +43,17 @@ under the License. </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-kubernetes-standalone</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> </dependencies> <build> @@ -50,6 +61,7 @@ under the License. <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> + <version>3.3.0</version> <executions> <execution> <id>shade-flink-operator</id> @@ -69,6 +81,16 @@ under the License. <shadedPattern>org.apache.flink.kubernetes.shaded.io.fabric8</shadedPattern> </relocation> </relocations> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/DEPENDENCIES</exclude> + <exclude>META-INF/LICENSE</exclude> + <exclude>META-INF/MANIFEST.MF</exclude> + </excludes> + </filter> + </filters> </configuration> </execution> </executions> diff --git a/flink-kubernetes-shaded/pom.xml b/flink-kubernetes-standalone/pom.xml similarity index 50% copy from flink-kubernetes-shaded/pom.xml copy to flink-kubernetes-standalone/pom.xml index 7fb94ef1..f2a1c2f6 100644 --- a/flink-kubernetes-shaded/pom.xml +++ b/flink-kubernetes-standalone/pom.xml @@ -20,18 +20,27 @@ under the License. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> + <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-kubernetes-operator-parent</artifactId> - <version>1.2-SNAPSHOT</version> - <relativePath>..</relativePath> + <artifactId>flink-kubernetes-operator-parent</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.2-SNAPSHOT</version> + <relativePath>..</relativePath> </parent> - <artifactId>flink-kubernetes-shaded</artifactId> - <name>Flink Kubernetes Shaded</name> + + + <artifactId>flink-kubernetes-standalone</artifactId> + <name>Flink Kubernetes Standalone</name> <packaging>jar</packaging> <dependencies> + <dependency> + <groupId>io.fabric8</groupId> + <artifactId>kubernetes-client</artifactId> + <version>${fabric8.version}</version> + </dependency> + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-kubernetes</artifactId> @@ -43,36 +52,33 @@ under the License. </exclusion> </exclusions> </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <executions> - <execution> - <id>shade-flink-operator</id> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <artifactSet> - <includes combine.children="append"> - <include>*:*</include> - </includes> - </artifactSet> - <relocations> - <relocation> - <pattern>io.fabric8</pattern> - <shadedPattern>org.apache.flink.kubernetes.shaded.io.fabric8</shadedPattern> - </relocation> - </relocations> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients</artifactId> + <version>${flink.version}</version> + </dependency> + + <!-- Test --> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <version>${junit.jupiter.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <version>${hamcrest.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>io.fabric8</groupId> + <artifactId>kubernetes-server-mock</artifactId> + <version>${fabric8.version}</version> + <scope>test</scope> + </dependency> + </dependencies> </project> diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java new file mode 100644 index 00000000..430abe2e --- /dev/null +++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.kubeclient.decorators; + +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.decorators.AbstractKubernetesStepDecorator; +import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerBuilder; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Attach the command and args to the main container for running the JobManager in standalone mode. + */ +public class CmdStandaloneJobManagerDecorator extends AbstractKubernetesStepDecorator { + + public static final String JOBMANAGER_ENTRYPOINT_ARG = "jobmanager"; + public static final String APPLICATION_MODE_ARG = "standalone-job"; + + private final StandaloneKubernetesJobManagerParameters kubernetesJobManagerParameters; + + public CmdStandaloneJobManagerDecorator( + StandaloneKubernetesJobManagerParameters kubernetesJobManagerParameters) { + this.kubernetesJobManagerParameters = checkNotNull(kubernetesJobManagerParameters); + } + + @Override + public FlinkPod decorateFlinkPod(FlinkPod flinkPod) { + final Container mainContainerWithStartCmd; + if (kubernetesJobManagerParameters.isApplicationCluster()) { + mainContainerWithStartCmd = decorateApplicationContainer(flinkPod.getMainContainer()); + } else { + mainContainerWithStartCmd = decorateSessionContainer(flinkPod.getMainContainer()); + } + return new FlinkPod.Builder(flinkPod).withMainContainer(mainContainerWithStartCmd).build(); + } + + private Container decorateSessionContainer(Container mainContainer) { + return new ContainerBuilder(mainContainer) + .withCommand(kubernetesJobManagerParameters.getContainerEntrypoint()) + .withArgs(JOBMANAGER_ENTRYPOINT_ARG) + .build(); + } + + private Container decorateApplicationContainer(Container mainContainer) { + return new ContainerBuilder(mainContainer) + .withCommand(kubernetesJobManagerParameters.getContainerEntrypoint()) + .withArgs(getApplicationClusterArgs()) + .build(); + } + + private List<String> getApplicationClusterArgs() { + List<String> args = new ArrayList<>(); + args.add(APPLICATION_MODE_ARG); + + String mainClass = kubernetesJobManagerParameters.getMainClass(); + if (mainClass != null) { + args.add("--job-classname"); + args.add(mainClass); + } + + return args; + } +} diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneTaskManagerDecorator.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneTaskManagerDecorator.java new file mode 100644 index 00000000..1533e0b2 --- /dev/null +++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneTaskManagerDecorator.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.kubeclient.decorators; + +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.decorators.AbstractKubernetesStepDecorator; +import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerBuilder; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Attach the command and args to the main container for running the TaskManager in standalone mode. + */ +public class CmdStandaloneTaskManagerDecorator extends AbstractKubernetesStepDecorator { + public static final String TASKMANAGER_ENTRYPOINT_ARG = "taskmanager"; + + private final StandaloneKubernetesTaskManagerParameters kubernetesTaskManagerParameters; + + public CmdStandaloneTaskManagerDecorator( + StandaloneKubernetesTaskManagerParameters kubernetesTaskManagerParameters) { + this.kubernetesTaskManagerParameters = checkNotNull(kubernetesTaskManagerParameters); + } + + @Override + public FlinkPod decorateFlinkPod(FlinkPod flinkPod) { + final Container mainContainerWithStartCmd = + new ContainerBuilder(flinkPod.getMainContainer()) + .withCommand(kubernetesTaskManagerParameters.getContainerEntrypoint()) + .withArgs(TASKMANAGER_ENTRYPOINT_ARG) + .build(); + return new FlinkPod.Builder(flinkPod).withMainContainer(mainContainerWithStartCmd).build(); + } +} diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/InitStandaloneTaskManagerDecorator.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/InitStandaloneTaskManagerDecorator.java new file mode 100644 index 00000000..adbc1a95 --- /dev/null +++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/InitStandaloneTaskManagerDecorator.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.kubeclient.decorators; + +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.decorators.AbstractKubernetesStepDecorator; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesToleration; +import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.kubernetes.utils.KubernetesUtils; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerBuilder; +import io.fabric8.kubernetes.api.model.ContainerPortBuilder; +import io.fabric8.kubernetes.api.model.EnvVar; +import io.fabric8.kubernetes.api.model.EnvVarBuilder; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.ResourceRequirements; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * An initializer for the TaskManager {@link org.apache.flink.kubernetes.kubeclient.FlinkPod} in + * standalone mode. + */ +public class InitStandaloneTaskManagerDecorator extends AbstractKubernetesStepDecorator { + private final StandaloneKubernetesTaskManagerParameters kubernetesTaskManagerParameters; + + public InitStandaloneTaskManagerDecorator( + StandaloneKubernetesTaskManagerParameters kubernetesTaskManagerParameters) { + this.kubernetesTaskManagerParameters = checkNotNull(kubernetesTaskManagerParameters); + } + + @Override + public FlinkPod decorateFlinkPod(FlinkPod flinkPod) { + final PodBuilder basicPodBuilder = new PodBuilder(flinkPod.getPodWithoutMainContainer()); + + // Overwrite fields + final String serviceAccountName = kubernetesTaskManagerParameters.getServiceAccount(); + + basicPodBuilder + .withApiVersion(Constants.API_VERSION) + .editOrNewSpec() + .withServiceAccount(serviceAccountName) + .withServiceAccountName(serviceAccountName) + .endSpec(); + + // Merge fields + basicPodBuilder + .editOrNewMetadata() + .addToLabels(kubernetesTaskManagerParameters.getLabels()) + .addToAnnotations(kubernetesTaskManagerParameters.getAnnotations()) + .endMetadata() + .editOrNewSpec() + .addToImagePullSecrets(kubernetesTaskManagerParameters.getImagePullSecrets()) + .addToNodeSelector(kubernetesTaskManagerParameters.getNodeSelector()) + .addAllToTolerations( + kubernetesTaskManagerParameters.getTolerations().stream() + .map(e -> KubernetesToleration.fromMap(e).getInternalResource()) + .collect(Collectors.toList())) + .endSpec(); + + final Container basicMainContainer = decorateMainContainer(flinkPod.getMainContainer()); + + return new FlinkPod.Builder(flinkPod) + .withPod(basicPodBuilder.build()) + .withMainContainer(basicMainContainer) + .build(); + } + + private Container decorateMainContainer(Container container) { + final ContainerBuilder mainContainerBuilder = new ContainerBuilder(container); + + // Overwrite fields + final ResourceRequirements requirementsInPodTemplate = + container.getResources() == null + ? new ResourceRequirements() + : container.getResources(); + final ResourceRequirements resourceRequirements = + KubernetesUtils.getResourceRequirements( + requirementsInPodTemplate, + kubernetesTaskManagerParameters.getTaskManagerMemoryMB(), + kubernetesTaskManagerParameters.getMemoryLimitFactor(), + kubernetesTaskManagerParameters.getTaskManagerCPU(), + kubernetesTaskManagerParameters.getCpuLimitFactor(), + Collections.emptyMap(), + Collections.emptyMap()); + final String image = kubernetesTaskManagerParameters.getImage(); + final String imagePullPolicy = kubernetesTaskManagerParameters.getImagePullPolicy().name(); + mainContainerBuilder + .withName(Constants.MAIN_CONTAINER_NAME) + .withImage(image) + .withImagePullPolicy(imagePullPolicy) + .withResources(resourceRequirements); + + // Merge fields + mainContainerBuilder + .addToPorts( + new ContainerPortBuilder() + .withName(Constants.TASK_MANAGER_RPC_PORT_NAME) + .withContainerPort(kubernetesTaskManagerParameters.getRPCPort()) + .build()) + .addAllToEnv(getCustomizedEnvs()); + getFlinkLogDirEnv().ifPresent(mainContainerBuilder::addToEnv); + + return mainContainerBuilder.build(); + } + + private List<EnvVar> getCustomizedEnvs() { + return kubernetesTaskManagerParameters.getEnvironments().entrySet().stream() + .map( + kv -> + new EnvVarBuilder() + .withName(kv.getKey()) + .withValue(kv.getValue()) + .build()) + .collect(Collectors.toList()); + } + + private Optional<EnvVar> getFlinkLogDirEnv() { + return kubernetesTaskManagerParameters + .getFlinkLogDirInPod() + .map(logDir -> new EnvVar(Constants.ENV_FLINK_LOG_DIR, logDir, null)); + } +} diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecorator.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecorator.java new file mode 100644 index 00000000..a66026b2 --- /dev/null +++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecorator.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.kubeclient.decorators; + +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.decorators.AbstractKubernetesStepDecorator; +import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.Volume; +import io.fabric8.kubernetes.api.model.VolumeBuilder; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Mount the Flink User Lib directory to enable Flink to pick up a Jars defined in + * pipeline.classpaths. Used for starting standalone application clusters + */ +public class UserLibMountDecorator extends AbstractKubernetesStepDecorator { + private static final String USER_LIB_VOLUME = "user-lib-dir"; + private static final String USER_LIB_PATH = "/opt/flink/usrlib"; + + private final StandaloneKubernetesJobManagerParameters kubernetesJobManagerParameters; + + public UserLibMountDecorator( + StandaloneKubernetesJobManagerParameters kubernetesJobManagerParameters) { + this.kubernetesJobManagerParameters = checkNotNull(kubernetesJobManagerParameters); + } + + @Override + public FlinkPod decorateFlinkPod(FlinkPod flinkPod) { + if (!kubernetesJobManagerParameters.isApplicationCluster()) { + return flinkPod; + } + + final Volume userLibVolume = + new VolumeBuilder() + .withName(USER_LIB_VOLUME) + .withNewEmptyDir() + .endEmptyDir() + .build(); + + final Pod pod = + new PodBuilder(flinkPod.getPodWithoutMainContainer()) + .editSpec() + .addNewVolumeLike(userLibVolume) + .endVolume() + .endSpec() + .build(); + + final Container mountedMainContainer = + new ContainerBuilder(flinkPod.getMainContainer()) + .addNewVolumeMount() + .withName(USER_LIB_VOLUME) + .withMountPath(USER_LIB_PATH) + .endVolumeMount() + .build(); + return new FlinkPod.Builder(flinkPod) + .withPod(pod) + .withMainContainer(mountedMainContainer) + .build(); + } +} diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java new file mode 100644 index 00000000..bdc28d48 --- /dev/null +++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.kubeclient.parameters; + +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.client.deployment.application.ApplicationConfiguration; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; +import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal; +import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * A Utility class that helps to parse, verify and manage the Kubernetes parameters that are used + * for constructing the JobManager deployment used for standalone cluster deployments. + */ +public class StandaloneKubernetesJobManagerParameters extends KubernetesJobManagerParameters { + + public StandaloneKubernetesJobManagerParameters( + Configuration flinkConfig, ClusterSpecification clusterSpecification) { + super(flinkConfig, clusterSpecification); + } + + @Override + public Map<String, String> getLabels() { + final Map<String, String> labels = new HashMap<>(); + labels.putAll(getSelectors()); + labels.putAll( + flinkConfig + .getOptional(KubernetesConfigOptions.JOB_MANAGER_LABELS) + .orElse(Collections.emptyMap())); + return Collections.unmodifiableMap(labels); + } + + @Override + public Map<String, String> getSelectors() { + return Collections.unmodifiableMap( + StandaloneKubernetesUtils.getJobManagerSelectors(getClusterId())); + } + + @Override + public Map<String, String> getCommonLabels() { + return Collections.unmodifiableMap( + StandaloneKubernetesUtils.getCommonLabels(getClusterId())); + } + + @Override + public boolean isInternalServiceEnabled() { + return true; + } + + public boolean isApplicationCluster() { + return flinkConfig + .get(StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE) + .equals(StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION); + } + + public String getMainClass() { + if (!isApplicationCluster()) { + return null; + } + return flinkConfig.getString(ApplicationConfiguration.APPLICATION_MAIN_CLASS); + } +} diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesTaskManagerParameters.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesTaskManagerParameters.java new file mode 100644 index 00000000..692614bf --- /dev/null +++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesTaskManagerParameters.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.kubeclient.parameters; + +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters; +import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal; +import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.util.Preconditions; + +import java.io.File; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * A utility class that helps to parse, verify and manage the Kubernetes parameters that are used + * for constructing the TaskManager deployment used for standalone deployments. + */ +public class StandaloneKubernetesTaskManagerParameters extends AbstractKubernetesParameters { + private final ClusterSpecification clusterSpecification; + + public StandaloneKubernetesTaskManagerParameters( + Configuration flinkConfig, ClusterSpecification clusterSpecification) { + super(flinkConfig); + this.clusterSpecification = clusterSpecification; + } + + @Override + public Map<String, String> getLabels() { + final Map<String, String> labels = new HashMap<>(); + labels.putAll( + flinkConfig + .getOptional(KubernetesConfigOptions.TASK_MANAGER_LABELS) + .orElse(Collections.emptyMap())); + labels.putAll(getSelectors()); + return Collections.unmodifiableMap(labels); + } + + @Override + public Map<String, String> getSelectors() { + return StandaloneKubernetesUtils.getTaskManagerSelectors(getClusterId()); + } + + @Override + public Map<String, String> getNodeSelector() { + return Collections.unmodifiableMap( + flinkConfig + .getOptional(KubernetesConfigOptions.TASK_MANAGER_NODE_SELECTOR) + .orElse(Collections.emptyMap())); + } + + @Override + public Map<String, String> getEnvironments() { + // TMs have environment set using the pod template. + return new HashMap<>(); + } + + @Override + public Map<String, String> getAnnotations() { + return flinkConfig + .getOptional(KubernetesConfigOptions.TASK_MANAGER_ANNOTATIONS) + .orElse(Collections.emptyMap()); + } + + @Override + public List<Map<String, String>> getTolerations() { + return flinkConfig + .getOptional(KubernetesConfigOptions.TASK_MANAGER_TOLERATIONS) + .orElse(Collections.emptyList()); + } + + public int getReplicas() { + int replicas = + flinkConfig.get( + StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS); + Preconditions.checkArgument( + replicas > 0, + "'%s' should not be configured less than 1.", + StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS.key()); + return replicas; + } + + public String getServiceAccount() { + return flinkConfig.get(KubernetesConfigOptions.TASK_MANAGER_SERVICE_ACCOUNT); + } + + public int getTaskManagerMemoryMB() { + return clusterSpecification.getTaskManagerMemoryMB(); + } + + public double getTaskManagerCPU() { + return flinkConfig.getDouble(KubernetesConfigOptions.TASK_MANAGER_CPU); + } + + public int getRPCPort() { + final int taskManagerRpcPort = + KubernetesUtils.parsePort(flinkConfig, TaskManagerOptions.RPC_PORT); + Preconditions.checkArgument( + taskManagerRpcPort > 0, "%s should not be 0.", TaskManagerOptions.RPC_PORT.key()); + return taskManagerRpcPort; + } + + public Optional<File> getPodTemplateFilePath() { + return flinkConfig + .getOptional(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE) + .map(File::new); + } + + public double getMemoryLimitFactor() { + return flinkConfig.get(KubernetesConfigOptions.TASK_MANAGER_MEMORY_LIMIT_FACTOR); + } + + public double getCpuLimitFactor() { + return flinkConfig.get(KubernetesConfigOptions.TASK_MANAGER_CPU_LIMIT_FACTOR); + } +} diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/StandaloneKubernetesConfigOptionsInternal.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/StandaloneKubernetesConfigOptionsInternal.java new file mode 100644 index 00000000..bb1af0e9 --- /dev/null +++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/StandaloneKubernetesConfigOptionsInternal.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.standalone; + +import org.apache.flink.configuration.ConfigOption; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * This class holds internal configuration constants used by flink operator when deploying flink + * clusters in standalone mode. + */ +public class StandaloneKubernetesConfigOptionsInternal { + public static final ConfigOption<Integer> KUBERNETES_TASKMANAGER_REPLICAS = + key("kubernetes.internal.taskmanager.replicas") + .intType() + .defaultValue(1) + .withDescription( + "Specify how many pods will be in the TaskManager pool. For " + + "standalone kubernetes Flink sessions clusters."); + + public static final ConfigOption<ClusterMode> CLUSTER_MODE = + key("kubernetes.internal.cluster-mode") + .enumType(ClusterMode.class) + .defaultValue(ClusterMode.APPLICATION) + .withDescription("Specify what mode the cluster will be deployed in."); + + /** The different modes that a Flink cluster can be deployed in. */ + public enum ClusterMode { + APPLICATION, + SESSION + } +} diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/utils/StandaloneKubernetesUtils.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/utils/StandaloneKubernetesUtils.java new file mode 100644 index 00000000..ca801e4c --- /dev/null +++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/utils/StandaloneKubernetesUtils.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.utils; + +import org.apache.flink.kubernetes.utils.Constants; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** Standalone Kubernetes Utils. */ +public class StandaloneKubernetesUtils { + + public static final String LABEL_TYPE_STANDALONE_TYPE = "flink-standalone-kubernetes"; + private static final String TM_DEPLOYMENT_POSTFIX = "-taskmanager"; + + public static String getTaskManagerDeploymentName(String clusterId) { + return clusterId + TM_DEPLOYMENT_POSTFIX; + } + + public static String getJobManagerDeploymentName(String clusterId) { + return clusterId; + } + + public static Map<String, String> getCommonLabels(String clusterId) { + Map<String, String> commonLabels = new HashMap(); + commonLabels.put(Constants.LABEL_TYPE_KEY, LABEL_TYPE_STANDALONE_TYPE); + commonLabels.put(Constants.LABEL_APP_KEY, clusterId); + return commonLabels; + } + + public static Map<String, String> getTaskManagerSelectors(String clusterId) { + Map<String, String> labels = getCommonLabels(clusterId); + labels.put(Constants.LABEL_COMPONENT_KEY, Constants.LABEL_COMPONENT_TASK_MANAGER); + return Collections.unmodifiableMap(labels); + } + + public static Map<String, String> getJobManagerSelectors(String clusterId) { + final Map<String, String> labels = getCommonLabels(clusterId); + labels.put(Constants.LABEL_COMPONENT_KEY, Constants.LABEL_COMPONENT_JOB_MANAGER); + return Collections.unmodifiableMap(labels); + } + + public static Map<String, String> getConfigMapLabels(String clusterId, String type) { + Map<String, String> labels = new HashMap(getCommonLabels(clusterId)); + labels.put("configmap-type", type); + return Collections.unmodifiableMap(labels); + } +} diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java new file mode 100644 index 00000000..49355574 --- /dev/null +++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.kubeclient.decorators; + +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters; +import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; + +/** @link CmdStandaloneJobManagerDecorator unit tests */ +public class CmdStandaloneJobManagerDecoratorTest { + + private static final String MOCK_ENTRYPATH = "./docker-entrypath"; + + private StandaloneKubernetesJobManagerParameters jmParameters; + private Configuration configuration; + private CmdStandaloneJobManagerDecorator decorator; + + @BeforeEach + public void setup() { + configuration = new Configuration(); + configuration.setString(KubernetesConfigOptions.KUBERNETES_ENTRY_PATH, MOCK_ENTRYPATH); + jmParameters = + new StandaloneKubernetesJobManagerParameters( + configuration, + new ClusterSpecification.ClusterSpecificationBuilder() + .createClusterSpecification()); + + decorator = new CmdStandaloneJobManagerDecorator(jmParameters); + } + + @Test + public void testSessionCommandAdded() { + configuration.set( + StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE, + StandaloneKubernetesConfigOptionsInternal.ClusterMode.SESSION); + + FlinkPod decoratedPod = decorator.decorateFlinkPod(new FlinkPod.Builder().build()); + assertThat( + decoratedPod.getMainContainer().getCommand(), containsInAnyOrder(MOCK_ENTRYPATH)); + assertThat( + decoratedPod.getMainContainer().getArgs(), + containsInAnyOrder(CmdStandaloneJobManagerDecorator.JOBMANAGER_ENTRYPOINT_ARG)); + } + + @Test + public void testApplicationCommandAdded() { + configuration.set( + StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE, + StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION); + + FlinkPod decoratedPod = decorator.decorateFlinkPod(new FlinkPod.Builder().build()); + assertThat( + decoratedPod.getMainContainer().getCommand(), containsInAnyOrder(MOCK_ENTRYPATH)); + assertThat( + decoratedPod.getMainContainer().getArgs(), + containsInAnyOrder(CmdStandaloneJobManagerDecorator.APPLICATION_MODE_ARG)); + } +} diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneTaskManagerDecoratorTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneTaskManagerDecoratorTest.java new file mode 100644 index 00000000..09d623c0 --- /dev/null +++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneTaskManagerDecoratorTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.kubeclient.decorators; + +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; + +/** @link CmdStandaloneTaskManagerDecorator unit tests */ +public class CmdStandaloneTaskManagerDecoratorTest { + + private static final String MOCK_ENTRYPATH = "./docker-entrypath"; + + private StandaloneKubernetesTaskManagerParameters tmParameters; + private Configuration configuration; + private CmdStandaloneTaskManagerDecorator decorator; + + @BeforeEach + public void setup() { + configuration = new Configuration(); + configuration.setString(KubernetesConfigOptions.KUBERNETES_ENTRY_PATH, MOCK_ENTRYPATH); + tmParameters = + new StandaloneKubernetesTaskManagerParameters( + configuration, + new ClusterSpecification.ClusterSpecificationBuilder() + .createClusterSpecification()); + + decorator = new CmdStandaloneTaskManagerDecorator(tmParameters); + } + + @Test + public void testCommandAdded() { + FlinkPod decoratedPod = decorator.decorateFlinkPod(new FlinkPod.Builder().build()); + + assertThat( + decoratedPod.getMainContainer().getCommand(), containsInAnyOrder(MOCK_ENTRYPATH)); + assertThat( + decoratedPod.getMainContainer().getArgs(), + containsInAnyOrder(CmdStandaloneTaskManagerDecorator.TASKMANAGER_ENTRYPOINT_ARG)); + } +} diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/InitStandaloneTaskManagerDecoratorTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/InitStandaloneTaskManagerDecoratorTest.java new file mode 100644 index 00000000..36483adf --- /dev/null +++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/InitStandaloneTaskManagerDecoratorTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.kubeclient.decorators; + +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.operator.kubeclient.parameters.ParametersTestBase; +import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters; +import org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils; +import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils; +import org.apache.flink.kubernetes.utils.Constants; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerPort; +import io.fabric8.kubernetes.api.model.ContainerPortBuilder; +import io.fabric8.kubernetes.api.model.EnvVar; +import io.fabric8.kubernetes.api.model.LocalObjectReference; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.Quantity; +import io.fabric8.kubernetes.api.model.ResourceRequirements; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** @link InitStandaloneTaskManagerDecorator unit tests */ +public class InitStandaloneTaskManagerDecoratorTest extends ParametersTestBase { + + private Pod resultPod; + private Container resultMainContainer; + + @BeforeEach + public void setup() { + setupFlinkConfig(); + + ClusterSpecification clusterSpecification = TestUtils.createClusterSpecification(); + StandaloneKubernetesTaskManagerParameters kubernetesTaskManagerParameters = + new StandaloneKubernetesTaskManagerParameters(flinkConfig, clusterSpecification); + InitStandaloneTaskManagerDecorator decorator = + new InitStandaloneTaskManagerDecorator(kubernetesTaskManagerParameters); + + final FlinkPod resultFlinkPod = decorator.decorateFlinkPod(createPodTemplate()); + resultPod = resultFlinkPod.getPodWithoutMainContainer(); + resultMainContainer = resultFlinkPod.getMainContainer(); + } + + @Test + public void testOverrideApiVersion() { + assertEquals(Constants.API_VERSION, resultPod.getApiVersion()); + } + + @Test + public void testMainContainerName() { + assertEquals(Constants.MAIN_CONTAINER_NAME, resultMainContainer.getName()); + } + + @Test + public void testOverrideMainContainerImage() { + assertEquals(TestUtils.IMAGE, resultMainContainer.getImage()); + } + + @Test + public void testOverrideMainContainerImagePullPolicy() { + assertEquals(TestUtils.IMAGE_POLICY, resultMainContainer.getImagePullPolicy()); + } + + @Test + public void testOverrideMainContainerResourceRequirements() { + final ResourceRequirements resourceRequirements = resultMainContainer.getResources(); + + final Map<String, Quantity> requests = resourceRequirements.getRequests(); + assertEquals(Double.toString(TestUtils.TASK_MANAGER_CPU), requests.get("cpu").getAmount()); + assertEquals( + String.valueOf(TestUtils.TASK_MANAGER_MEMORY_MB), + requests.get("memory").getAmount()); + + final Map<String, Quantity> limits = resourceRequirements.getLimits(); + assertEquals(Double.toString(TestUtils.TASK_MANAGER_CPU), limits.get("cpu").getAmount()); + assertEquals( + String.valueOf(TestUtils.TASK_MANAGER_MEMORY_MB), limits.get("memory").getAmount()); + } + + @Test + public void testMergeMainContainerPorts() { + final List<ContainerPort> expectedContainerPorts = + Arrays.asList( + new ContainerPortBuilder() + .withName(Constants.TASK_MANAGER_RPC_PORT_NAME) + .withContainerPort(Constants.TASK_MANAGER_RPC_PORT) + .build(), + new ContainerPortBuilder() + .withName(TEMPLATE_PORT_NAME) + .withContainerPort(TEMPLATE_PORT) + .build()); + + assertThat( + resultMainContainer.getPorts(), + containsInAnyOrder(expectedContainerPorts.toArray())); + } + + @Test + public void testMergePodLabels() { + final Map<String, String> expectedLabels = + new HashMap<>( + StandaloneKubernetesUtils.getTaskManagerSelectors(TestUtils.CLUSTER_ID)); + expectedLabels.putAll(userLabels); + expectedLabels.putAll(templateLabels); + + assertEquals(expectedLabels, resultPod.getMetadata().getLabels()); + } + + @Test + public void testMergePodAnnotations() { + final Map<String, String> expectedAnnotations = new HashMap<>(userAnnotations); + expectedAnnotations.putAll(templateAnnotations); + assertEquals(expectedAnnotations, resultPod.getMetadata().getAnnotations()); + } + + @Test + public void testOverridePodServiceAccountName() { + assertEquals(TestUtils.SERVICE_ACCOUNT, resultPod.getSpec().getServiceAccountName()); + } + + @Test + public void testMergeImagePullSecrets() { + final List<String> resultSecrets = + resultPod.getSpec().getImagePullSecrets().stream() + .map(LocalObjectReference::getName) + .collect(Collectors.toList()); + final List<String> expectedImagePullSecrets = new ArrayList<>(userImagePullSecrets); + expectedImagePullSecrets.addAll(templateImagePullSecrets); + + assertThat(resultSecrets, containsInAnyOrder(expectedImagePullSecrets.toArray())); + } + + @Test + public void testMergeNodeSelector() { + final Map<String, String> expectedNodeSelectors = new HashMap<>(userNodeSelectors); + expectedNodeSelectors.putAll(templateNodeSelector); + assertEquals(expectedNodeSelectors, resultPod.getSpec().getNodeSelector()); + } + + @Test + public void testEnvs() { + final List<EnvVar> envVars = resultMainContainer.getEnv(); + + final Map<String, String> envs = new HashMap<>(); + envVars.forEach(env -> envs.put(env.getName(), env.getValue())); + + assertEquals(templateEnvs, envs); + } +} diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecoratorTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecoratorTest.java new file mode 100644 index 00000000..07c8af20 --- /dev/null +++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecoratorTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.kubeclient.decorators; + +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters; +import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal; + +import io.fabric8.kubernetes.api.model.VolumeMount; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** @link UserLibMountDecorator unit tests */ +public class UserLibMountDecoratorTest { + + @Test + public void testVolumeAdded() { + StandaloneKubernetesJobManagerParameters jmParameters = + createJmParamsWithClusterMode( + StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION); + + UserLibMountDecorator decorator = new UserLibMountDecorator(jmParameters); + + FlinkPod baseFlinkPod = new FlinkPod.Builder().build(); + + assertEquals(0, baseFlinkPod.getMainContainer().getVolumeMounts().size()); + assertEquals(0, baseFlinkPod.getPodWithoutMainContainer().getSpec().getVolumes().size()); + + FlinkPod decoratedPod = decorator.decorateFlinkPod(baseFlinkPod); + assertEquals(1, decoratedPod.getMainContainer().getVolumeMounts().size()); + assertEquals(1, decoratedPod.getPodWithoutMainContainer().getSpec().getVolumes().size()); + + VolumeMount volumeMount = decoratedPod.getMainContainer().getVolumeMounts().get(0); + + assertEquals("/opt/flink/usrlib", volumeMount.getMountPath()); + } + + @Test + public void testVolumeNotAdded() { + StandaloneKubernetesJobManagerParameters jmParameters = + createJmParamsWithClusterMode( + StandaloneKubernetesConfigOptionsInternal.ClusterMode.SESSION); + UserLibMountDecorator decorator = new UserLibMountDecorator(jmParameters); + + FlinkPod baseFlinkPod = new FlinkPod.Builder().build(); + assertEquals(0, baseFlinkPod.getMainContainer().getVolumeMounts().size()); + assertEquals(0, baseFlinkPod.getPodWithoutMainContainer().getSpec().getVolumes().size()); + + FlinkPod decoratedPod = decorator.decorateFlinkPod(baseFlinkPod); + assertEquals(0, decoratedPod.getMainContainer().getVolumeMounts().size()); + assertEquals(0, decoratedPod.getPodWithoutMainContainer().getSpec().getVolumes().size()); + } + + private StandaloneKubernetesJobManagerParameters createJmParamsWithClusterMode( + StandaloneKubernetesConfigOptionsInternal.ClusterMode clusterMode) { + return new StandaloneKubernetesJobManagerParameters( + new Configuration() + .set(StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE, clusterMode), + new ClusterSpecification.ClusterSpecificationBuilder() + .createClusterSpecification()); + } +} diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/ParametersTestBase.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/ParametersTestBase.java new file mode 100644 index 00000000..c3ffbafb --- /dev/null +++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/ParametersTestBase.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.kubeclient.parameters; + +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils; +import org.apache.flink.kubernetes.utils.KubernetesUtils; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerBuilder; +import io.fabric8.kubernetes.api.model.ContainerPortBuilder; +import io.fabric8.kubernetes.api.model.EnvVar; +import io.fabric8.kubernetes.api.model.EnvVarBuilder; +import io.fabric8.kubernetes.api.model.LocalObjectReference; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.ResourceRequirements; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** Base class for Kubernetes tests. */ +public class ParametersTestBase { + + protected Configuration flinkConfig; + + protected ClusterSpecification clusterSpecification = TestUtils.createClusterSpecification(); + + protected final Map<String, String> userLabels = + TestUtils.generateTestStringStringMap("label", "value", 2); + + protected final Map<String, String> userAnnotations = + TestUtils.generateTestStringStringMap("annotation", "value", 2); + + protected final Map<String, String> userNodeSelectors = + TestUtils.generateTestStringStringMap("selector", "val", 2); + + protected final List<String> userImagePullSecrets = Arrays.asList("s1", "s2", "s3"); + + protected static final String TEMPLATE_PORT_NAME = "user-port"; + protected static final int TEMPLATE_PORT = 9458; + + protected final Map<String, String> templateEnvs = + TestUtils.generateTestStringStringMap("TEMPLATE_KEY", "VAL", 2); + + protected final Map<String, String> templateLabels = + TestUtils.generateTestStringStringMap("template-label", "value", 2); + + protected final Map<String, String> templateAnnotations = + TestUtils.generateTestStringStringMap("template-annotation", "value", 2); + + protected final Map<String, String> templateNodeSelector = + TestUtils.generateTestStringStringMap("template-node-selector", "value", 2); + + protected final List<String> templateImagePullSecrets = Arrays.asList("ts1", "ts2", "ts3"); + + private static final String SECRETS = "ssl-cert:/etc/ssl"; + + protected FlinkPod createPodTemplate() { + List<EnvVar> envVars = new ArrayList<>(); + templateEnvs.forEach( + (k, v) -> envVars.add(new EnvVarBuilder().withName(k).withValue(v).build())); + + Container mainContainer = + new ContainerBuilder() + .withImagePullPolicy("templatePullPolicy") + .withImage("templateImage") + .withResources( + KubernetesUtils.getResourceRequirements( + new ResourceRequirements(), + 1234, + 1, + 102, + 1, + Collections.emptyMap(), + Collections.emptyMap())) + .withPorts( + new ContainerPortBuilder() + .withName(TEMPLATE_PORT_NAME) + .withContainerPort(TEMPLATE_PORT) + .build()) + .withEnv(envVars) + .build(); + + return new FlinkPod.Builder() + .withMainContainer(mainContainer) + .withPod( + new PodBuilder() + .withApiVersion("templateAPIVersion") + .editOrNewSpec() + .withServiceAccountName("templateServiceAccountName") + .withServiceAccount("templateServiceAccount") + .endSpec() + .editOrNewMetadata() + .addToLabels(templateLabels) + .addToAnnotations(templateAnnotations) + .endMetadata() + .editOrNewSpec() + .addToImagePullSecrets(getImagePullSecrets()) + .addToNodeSelector(templateNodeSelector) + .endSpec() + .build()) + .build(); + } + + private LocalObjectReference[] getImagePullSecrets() { + return templateImagePullSecrets.stream() + .map(String::trim) + .filter(secret -> !secret.isEmpty()) + .map(LocalObjectReference::new) + .toArray(LocalObjectReference[]::new); + } + + protected void setupFlinkConfig() { + flinkConfig = TestUtils.createTestFlinkConfig(); + flinkConfig.set(KubernetesConfigOptions.TASK_MANAGER_ANNOTATIONS, userAnnotations); + flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_ANNOTATIONS, userAnnotations); + flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_LABELS, userLabels); + flinkConfig.set(KubernetesConfigOptions.TASK_MANAGER_LABELS, userLabels); + flinkConfig.set(KubernetesConfigOptions.TASK_MANAGER_NODE_SELECTOR, userNodeSelectors); + flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_NODE_SELECTOR, userNodeSelectors); + flinkConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_SECRETS, userImagePullSecrets); + flinkConfig.setString(KubernetesConfigOptions.KUBERNETES_SECRETS.key(), SECRETS); + } +} diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParametersTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParametersTest.java new file mode 100644 index 00000000..cf6c0250 --- /dev/null +++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParametersTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.kubeclient.parameters; + +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.client.deployment.application.ApplicationConfiguration; +import org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils; +import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal; +import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** @link StandaloneKubernetesJobManagerParameters unit tests */ +public class StandaloneKubernetesJobManagerParametersTest extends ParametersTestBase { + private StandaloneKubernetesJobManagerParameters kubernetesJobManagerParameters; + + @BeforeEach + public void setup() { + setupFlinkConfig(); + ClusterSpecification clusterSpecification = TestUtils.createClusterSpecification(); + kubernetesJobManagerParameters = + new StandaloneKubernetesJobManagerParameters(flinkConfig, clusterSpecification); + } + + @Test + public void testGetLabels() { + Map<String, String> expectedLabels = new HashMap<>(); + expectedLabels.putAll(userLabels); + expectedLabels.putAll( + StandaloneKubernetesUtils.getJobManagerSelectors(TestUtils.CLUSTER_ID)); + assertEquals(expectedLabels, kubernetesJobManagerParameters.getLabels()); + } + + @Test + public void testGetSelectors() { + assertEquals( + StandaloneKubernetesUtils.getJobManagerSelectors(TestUtils.CLUSTER_ID), + kubernetesJobManagerParameters.getSelectors()); + } + + @Test + public void testGetCommonLabels() { + assertEquals( + StandaloneKubernetesUtils.getCommonLabels(TestUtils.CLUSTER_ID), + kubernetesJobManagerParameters.getCommonLabels()); + } + + @Test + public void testIsInternalServiceEnabled() { + assertTrue(kubernetesJobManagerParameters.isInternalServiceEnabled()); + } + + @Test + public void testIsApplicationCluster() { + flinkConfig.set( + StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE, + StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION); + assertTrue(kubernetesJobManagerParameters.isApplicationCluster()); + + flinkConfig.set( + StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE, + StandaloneKubernetesConfigOptionsInternal.ClusterMode.SESSION); + assertFalse(kubernetesJobManagerParameters.isApplicationCluster()); + } + + @Test + public void testGetMainClass() { + String entryClass = "my.main.test.class"; + flinkConfig.set(ApplicationConfiguration.APPLICATION_MAIN_CLASS, entryClass); + + flinkConfig.set( + StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE, + StandaloneKubernetesConfigOptionsInternal.ClusterMode.SESSION); + assertNull(kubernetesJobManagerParameters.getMainClass()); + + flinkConfig.set( + StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE, + StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION); + assertEquals(entryClass, kubernetesJobManagerParameters.getMainClass()); + } +} diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesTaskManagerParametersTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesTaskManagerParametersTest.java new file mode 100644 index 00000000..c794a630 --- /dev/null +++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesTaskManagerParametersTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.kubeclient.parameters; + +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils; +import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal; +import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils; +import org.apache.flink.kubernetes.utils.Constants; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** @link StandaloneKubernetesTaskManagerParameters unit tests */ +public class StandaloneKubernetesTaskManagerParametersTest extends ParametersTestBase { + + private StandaloneKubernetesTaskManagerParameters kubernetesTaskManagerParameters; + + @BeforeEach + public void setup() { + setupFlinkConfig(); + kubernetesTaskManagerParameters = + new StandaloneKubernetesTaskManagerParameters(flinkConfig, clusterSpecification); + } + + @Test + public void testGetLabels() { + Map<String, String> expectedLabels = new HashMap<>(); + expectedLabels.putAll(userLabels); + expectedLabels.putAll( + StandaloneKubernetesUtils.getTaskManagerSelectors(TestUtils.CLUSTER_ID)); + assertEquals(expectedLabels, kubernetesTaskManagerParameters.getLabels()); + } + + @Test + public void testGetSelectors() { + Map<String, String> expectedSelectors = + StandaloneKubernetesUtils.getTaskManagerSelectors(TestUtils.CLUSTER_ID); + + assertEquals(expectedSelectors, kubernetesTaskManagerParameters.getSelectors()); + } + + @Test + public void testGetNodeSelector() { + assertEquals(userNodeSelectors, kubernetesTaskManagerParameters.getNodeSelector()); + } + + @Test + public void testGetEnvironments() { + assertTrue(kubernetesTaskManagerParameters.getEnvironments().isEmpty()); + } + + @Test + public void testGetAnnotations() { + assertEquals(userAnnotations, kubernetesTaskManagerParameters.getAnnotations()); + } + + @Test + public void testGetReplicas() { + int tmReplicas = 11; + flinkConfig.set( + StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS, + tmReplicas); + assertEquals(tmReplicas, kubernetesTaskManagerParameters.getReplicas()); + } + + @Test + public void testInvalidReplicas() { + flinkConfig.set( + StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS, -1); + kubernetesTaskManagerParameters = + new StandaloneKubernetesTaskManagerParameters(flinkConfig, clusterSpecification); + + assertThrows( + IllegalArgumentException.class, + () -> { + kubernetesTaskManagerParameters.getReplicas(); + }); + } + + @Test + public void testGetServiceAccount() { + assertEquals( + TestUtils.SERVICE_ACCOUNT, kubernetesTaskManagerParameters.getServiceAccount()); + } + + @Test + public void testGetTaskManagerMemoryMB() { + assertEquals( + TestUtils.TASK_MANAGER_MEMORY_MB, + kubernetesTaskManagerParameters.getTaskManagerMemoryMB()); + } + + @Test + public void testGetTaskManagerCPU() { + assertEquals( + TestUtils.TASK_MANAGER_CPU, + kubernetesTaskManagerParameters.getTaskManagerCPU(), + 0.00001); + } + + @Test + public void testGetGetRPCPort() { + assertEquals(Constants.TASK_MANAGER_RPC_PORT, kubernetesTaskManagerParameters.getRPCPort()); + } + + @Test + public void testInvalidRPCPort() { + flinkConfig.set(TaskManagerOptions.RPC_PORT, String.valueOf(0)); + assertThrows( + IllegalArgumentException.class, + () -> { + kubernetesTaskManagerParameters.getRPCPort(); + }); + } + + @Test + public void testGetPodTemplateFilePath() { + String templateFilePath = "/tmp/tst.yml"; + flinkConfig.setString(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE, templateFilePath); + Optional<File> templateFile = kubernetesTaskManagerParameters.getPodTemplateFilePath(); + assertTrue(templateFile.isPresent()); + String filePath = templateFile.get().getAbsolutePath(); + assertEquals(templateFilePath, filePath); + } + + @Test + public void testGetNoPodTemplateFilePath() { + flinkConfig.removeConfig(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE); + Optional<File> templateFile = kubernetesTaskManagerParameters.getPodTemplateFilePath(); + assertFalse(templateFile.isPresent()); + } +} diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/utils/TestUtils.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/utils/TestUtils.java new file mode 100644 index 00000000..5a651930 --- /dev/null +++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/utils/TestUtils.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.kubeclient.utils; + +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.utils.Constants; + +import java.util.HashMap; +import java.util.Map; + +/** Testing utilities. */ +public class TestUtils { + + public static final String CLUSTER_ID = "test-cluster"; + public static final String SERVICE_ACCOUNT = "flink-operator"; + public static final String TEST_NAMESPACE = "flink-operator-test"; + + public static final String TASK_MANAGER_MEMORY = "2048m"; + public static final String JOB_MANAGER_MEMORY = "1024m"; + + public static final String FLINK_VERSION = "latest"; + public static final String IMAGE = String.format("flink:%s", FLINK_VERSION); + public static final String IMAGE_POLICY = "IfNotPresent"; + + public static final int TASK_MANAGER_MEMORY_MB = + MemorySize.parse(TASK_MANAGER_MEMORY).getMebiBytes(); + public static final int JOB_MANAGER_MEMORY_MB = + MemorySize.parse(JOB_MANAGER_MEMORY).getMebiBytes(); + + public static final int SLOTS_PER_TASK_MANAGER = 2; + + public static final double TASK_MANAGER_CPU = 4; + public static final double JOB_MANAGER_CPU = 2; + + public static final Map<String, String> generateTestStringStringMap( + String keyPrefix, String valuePrefix, int entries) { + Map<String, String> map = new HashMap<>(); + for (int i = 1; i <= entries; i++) { + map.put(keyPrefix + i, valuePrefix + i); + } + return map; + } + + public static ClusterSpecification createClusterSpecification() { + return new ClusterSpecification.ClusterSpecificationBuilder() + .setMasterMemoryMB(JOB_MANAGER_MEMORY_MB) + .setTaskManagerMemoryMB(TASK_MANAGER_MEMORY_MB) + .setSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER) + .createClusterSpecification(); + } + + public static Configuration createTestFlinkConfig() { + Configuration flinkConf = new Configuration(); + flinkConf.set(KubernetesConfigOptions.CLUSTER_ID, CLUSTER_ID); + flinkConf.set(KubernetesConfigOptions.NAMESPACE, TEST_NAMESPACE); + flinkConf.set(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, SERVICE_ACCOUNT); + flinkConf.set(KubernetesConfigOptions.CONTAINER_IMAGE, IMAGE); + flinkConf.set( + KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY, + KubernetesConfigOptions.ImagePullPolicy.valueOf(IMAGE_POLICY)); + + flinkConf.set(KubernetesConfigOptions.JOB_MANAGER_CPU, JOB_MANAGER_CPU); + flinkConf.set(KubernetesConfigOptions.TASK_MANAGER_CPU, TASK_MANAGER_CPU); + + flinkConf.setString( + TaskManagerOptions.RPC_PORT, String.valueOf(Constants.TASK_MANAGER_RPC_PORT)); + flinkConf.setString(BlobServerOptions.PORT, String.valueOf(Constants.BLOB_SERVER_PORT)); + flinkConf.setString(RestOptions.BIND_PORT, String.valueOf(Constants.REST_PORT)); + return flinkConf; + } +} diff --git a/pom.xml b/pom.xml index e548bfdd..6eddd541 100644 --- a/pom.xml +++ b/pom.xml @@ -51,6 +51,7 @@ under the License. </scm> <modules> + <module>flink-kubernetes-standalone</module> <module>flink-kubernetes-shaded</module> <module>flink-kubernetes-operator</module> <module>flink-kubernetes-webhook</module> @@ -62,7 +63,7 @@ under the License. <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> <maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version> - <maven-surefire-plugin.version>3.0.0-M5</maven-surefire-plugin.version> + <maven-surefire-plugin.version>3.0.0-M4</maven-surefire-plugin.version> <maven-failsafe-plugin.version>3.0.0-M5</maven-failsafe-plugin.version> <maven-resources-plugin.version>3.2.0</maven-resources-plugin.version> <git-commit-id-maven-plugin.version>5.0.0</git-commit-id-maven-plugin.version> @@ -81,6 +82,9 @@ under the License. <spotless.version>2.4.2</spotless.version> <it.skip>true</it.skip> + + <hamcrest.version>1.3</hamcrest.version> + </properties> <dependencyManagement>
