This is an automated email from the ASF dual-hosted git repository.
mbalassi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new be2aaf7 [FLINK-28496] Label selector setting for operator
be2aaf7 is described below
commit be2aaf79c88d02de0638215b1b4ed9c5263e1951
Author: Márton Balassi <[email protected]>
AuthorDate: Thu Jul 14 16:43:22 2022 +0200
[FLINK-28496] Label selector setting for operator
---
.../kubernetes_operator_config_configuration.html | 6 ++
.../shortcodes/generated/system_section.html | 6 ++
.../flink/kubernetes/operator/FlinkOperator.java | 18 +++++-
.../config/FlinkOperatorConfiguration.java | 7 ++-
.../config/KubernetesOperatorConfigOptions.java | 9 +++
.../kubernetes/operator/FlinkOperatorTest.java | 67 ++++++++++++++--------
.../flink-kubernetes-operator/conf/flink-conf.yaml | 1 +
7 files changed, 85 insertions(+), 29 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index cfc1ba9..3912ef7 100644
---
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -80,6 +80,12 @@
<td>Boolean</td>
<td>Enables last-state fallback for savepoint upgrade mode. When
the job is not running thus savepoint cannot be triggered but HA metadata is
available for last state restore the operator can initiate the upgrade process
when the flag is enabled.</td>
</tr>
+ <tr>
+ <td><h5>kubernetes.operator.label.selector</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Label selector of the custom resources to be watched. Please
see
https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors
for the format supported.</td>
+ </tr>
<tr>
<td><h5>kubernetes.operator.observer.progress-check.interval</h5></td>
<td style="word-wrap: break-word;">10 s</td>
diff --git a/docs/layouts/shortcodes/generated/system_section.html
b/docs/layouts/shortcodes/generated/system_section.html
index 887326b..8b2dad2 100644
--- a/docs/layouts/shortcodes/generated/system_section.html
+++ b/docs/layouts/shortcodes/generated/system_section.html
@@ -26,6 +26,12 @@
<td>Duration</td>
<td>The timeout for the observer to wait the flink rest client to
return.</td>
</tr>
+ <tr>
+ <td><h5>kubernetes.operator.label.selector</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Label selector of the custom resources to be watched. Please
see
https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors
for the format supported.</td>
+ </tr>
<tr>
<td><h5>kubernetes.operator.reconcile.interval</h5></td>
<td style="word-wrap: break-word;">1 min</td>
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index c0bc279..8933a6c 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -17,6 +17,7 @@
package org.apache.flink.kubernetes.operator;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginManager;
@@ -70,7 +71,7 @@ public class FlinkOperator {
private final FlinkService flinkService;
private final FlinkConfigManager configManager;
private final Set<FlinkResourceValidator> validators;
- private final Set<RegisteredController> registeredControllers = new
HashSet<>();
+ @VisibleForTesting final Set<RegisteredController> registeredControllers =
new HashSet<>();
private final KubernetesOperatorMetricGroup metricGroup;
private final Collection<FlinkResourceListener> listeners;
@@ -117,7 +118,8 @@ public class FlinkOperator {
}
}
- private void registerDeploymentController() {
+ @VisibleForTesting
+ void registerDeploymentController() {
var statusRecorder =
StatusRecorder.<FlinkDeploymentStatus>create(
client, new MetricManager<>(metricGroup,
configManager), listeners);
@@ -139,7 +141,8 @@ public class FlinkOperator {
registeredControllers.add(operator.register(controller,
this::overrideControllerConfigs));
}
- private void registerSessionJobController() {
+ @VisibleForTesting
+ void registerSessionJobController() {
var eventRecorder = EventRecorder.create(client, listeners);
var statusRecorder =
StatusRecorder.<FlinkSessionJobStatus>create(
@@ -156,11 +159,20 @@ public class FlinkOperator {
}
private void overrideControllerConfigs(ControllerConfigurationOverrider<?>
overrider) {
+ var watchNamespaces =
configManager.getOperatorConfiguration().getWatchedNamespaces();
+ LOG.info("Configuring operator to watch the following namespaces:
{}.", watchNamespaces);
overrider.settingNamespaces(
configManager.getOperatorConfiguration().getWatchedNamespaces());
+
overrider.withRetry(
GenericRetry.fromConfiguration(
configManager.getOperatorConfiguration().getRetryConfiguration()));
+
+ var labelSelector =
configManager.getOperatorConfiguration().getLabelSelector();
+ LOG.info(
+ "Configuring operator to select custom resources with the {}
labels.",
+ labelSelector);
+ overrider.withLabelSelector(labelSelector);
}
public void run() {
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index 45b9507..a4065bc 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -53,6 +53,7 @@ public class FlinkOperatorConfiguration {
Integer savepointHistoryCountThreshold;
Duration savepointHistoryAgeThreshold;
RetryConfiguration retryConfiguration;
+ String labelSelector;
public static FlinkOperatorConfiguration fromConfiguration(Configuration
operatorConfig) {
Duration reconcileInterval =
@@ -125,6 +126,9 @@ public class FlinkOperatorConfiguration {
RetryConfiguration retryConfiguration = new
FlinkOperatorRetryConfiguration(operatorConfig);
+ String labelSelector =
+
operatorConfig.getString(KubernetesOperatorConfigOptions.OPERATOR_LABEL_SELECTOR);
+
return new FlinkOperatorConfiguration(
reconcileInterval,
reconcilerMaxParallelism,
@@ -142,7 +146,8 @@ public class FlinkOperatorConfiguration {
artifactsBaseDir,
savepointHistoryCountThreshold,
savepointHistoryAgeThreshold,
- retryConfiguration);
+ retryConfiguration,
+ labelSelector);
}
/** Enables configurable retry mechanism for reconciliation errors. */
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index d76aaef..774882a 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -233,6 +233,15 @@ public class KubernetesOperatorConfigOptions {
.withDescription(
"Comma separated list of namespaces the operator
monitors for custom resources.");
+ @Documentation.Section(SECTION_SYSTEM)
+ public static final ConfigOption<String> OPERATOR_LABEL_SELECTOR =
+ ConfigOptions.key("kubernetes.operator.label.selector")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Label selector of the custom resources to be
watched. Please see "
+ +
"https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors
for the format supported.");
+
@Documentation.Section(SECTION_SYSTEM)
public static final ConfigOption<Boolean>
OPERATOR_DYNAMIC_NAMESPACES_ENABLED =
ConfigOptions.key("kubernetes.operator.dynamic.namespaces.enabled")
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
index 40e5412..cf5bb68 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
@@ -20,41 +20,58 @@ package org.apache.flink.kubernetes.operator;
import org.apache.flink.configuration.Configuration;
import
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import io.javaoperatorsdk.operator.RegisteredController;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
+import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-import java.util.Optional;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
-/** @link FlinkOperator unit tests. */
+/**
+ * @link FlinkOperator unit tests. Since at the time of writing this the JOSDK
does not support
+ * overriding the configuration multiple times (has a singleton @link
+ * ConfigurationServiceProvider) we write multiple tests as a single
function, please provide
+ * ample comments.
+ */
public class FlinkOperatorTest {
@Test
- public void
testExecutorServiceUsesReconciliationMaxParallelismFromConfig() {
- checkExecutorServiceThreadCount(Optional.of(42), 42);
- // TODO: cannot override the operator configs twice in
java-operator-sdk v3
- Assertions.assertThrows(
- IllegalStateException.class,
- () -> checkExecutorServiceThreadCount(Optional.of(-1), 1));
- }
+ public void testConfigurationPassedToJOSDK() {
+ var testParallelism = 42;
+ var testSelector = "flink=enabled";
+ var operatorConfig = new Configuration();
- private void checkExecutorServiceThreadCount(
- Optional<Integer> parallelism, int expectedThreadCount) {
- var es = getExecutorForParallelismConfig(parallelism);
- Assertions.assertInstanceOf(ThreadPoolExecutor.class, es);
- ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) es;
- Assertions.assertEquals(expectedThreadCount,
threadPoolExecutor.getMaximumPoolSize());
- }
+ operatorConfig.setInteger(
+
KubernetesOperatorConfigOptions.OPERATOR_RECONCILE_PARALLELISM,
testParallelism);
+
operatorConfig.set(KubernetesOperatorConfigOptions.OPERATOR_LABEL_SELECTOR,
testSelector);
- private ExecutorService getExecutorForParallelismConfig(Optional<Integer>
parallelism) {
- var operatorConfig = new Configuration();
- parallelism.ifPresent(
- p ->
- operatorConfig.setInteger(
-
KubernetesOperatorConfigOptions.OPERATOR_RECONCILE_PARALLELISM, p));
- new FlinkOperator(operatorConfig);
- return ConfigurationServiceProvider.instance().getExecutorService();
+ var testOperator = new FlinkOperator(operatorConfig);
+ testOperator.registerDeploymentController();
+ testOperator.registerSessionJobController();
+
+ // Test parallelism being passed
+ var executorService =
ConfigurationServiceProvider.instance().getExecutorService();
+ Assertions.assertInstanceOf(ThreadPoolExecutor.class, executorService);
+ ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor)
executorService;
+ Assertions.assertEquals(threadPoolExecutor.getMaximumPoolSize(),
testParallelism);
+
+ // Test label selector being passed
+ // We have a label selector for each controller
+ var labelSelectors =
+ testOperator.registeredControllers.stream()
+ .map(RegisteredController::getConfiguration)
+ .map(ControllerConfiguration::getLabelSelector);
+
+ labelSelectors.forEach(selector ->
Assertions.assertEquals(testSelector, selector));
+
+ // TODO: Overriding operator configuration twice in JOSDK v3 yields
IllegalStateException
+ var secondParallelism = 420;
+ var secondConfig = new Configuration();
+
+ secondConfig.setInteger(
+
KubernetesOperatorConfigOptions.OPERATOR_RECONCILE_PARALLELISM,
secondParallelism);
+
+ Assertions.assertThrows(IllegalStateException.class, () -> new
FlinkOperator(secondConfig));
}
}
diff --git a/helm/flink-kubernetes-operator/conf/flink-conf.yaml
b/helm/flink-kubernetes-operator/conf/flink-conf.yaml
index 5b47ec7..be6a85c 100644
--- a/helm/flink-kubernetes-operator/conf/flink-conf.yaml
+++ b/helm/flink-kubernetes-operator/conf/flink-conf.yaml
@@ -40,6 +40,7 @@ parallelism.default: 2
# kubernetes.operator.user.artifacts.base.dir: /opt/flink/artifacts
# kubernetes.operator.job.upgrade.ignore-pending-savepoint: false
# kubernetes.operator.watched.namespaces: ns1,ns2
+# kubernetes.operator.label.selector: flink=enabled
# kubernetes.operator.dynamic.namespaces.enabled: false
# kubernetes.operator.retry.initial.interval: 5 s
# kubernetes.operator.retry.interval.multiplier: 2