This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 354da49 [FLINK-26714] Remove .sec from OperatorConfigOptions and use
Duration type config instead
354da49 is described below
commit 354da496d0142894ea0b5b32219f0ce89d7551a5
Author: SteNicholas <[email protected]>
AuthorDate: Fri Mar 18 12:06:40 2022 +0800
[FLINK-26714] Remove .sec from OperatorConfigOptions and use Duration type
config instead
---
.../config/FlinkOperatorConfiguration.java | 39 ++++++++++----------
.../operator/config/OperatorConfigOptions.java | 41 +++++++++++-----------
.../observer/JobManagerDeploymentStatus.java | 16 ++++-----
.../kubernetes/operator/utils/SavepointUtils.java | 7 ++--
.../controller/FlinkDeploymentControllerTest.java | 30 +++++++---------
.../operator/observer/JobObserverTest.java | 13 ++++---
.../operator/observer/SessionObserverTest.java | 23 ++++++++++--
.../conf/flink-operator-config/flink-conf.yaml | 8 ++---
helm/flink-operator/values.yaml | 4 +--
9 files changed, 98 insertions(+), 83 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index 6e25075..016e494 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -24,36 +24,33 @@ import
org.apache.flink.kubernetes.operator.utils.OperatorUtils;
import lombok.Value;
+import java.time.Duration;
import java.util.Set;
/** Configuration class for operator. */
@Value
public class FlinkOperatorConfiguration {
- int reconcileIntervalSeconds;
- int progressCheckIntervalSeconds;
- int restApiReadyDelaySeconds;
- int savepointTriggerGracePeriodSeconds;
+ Duration reconcileInterval;
+ Duration progressCheckInterval;
+ Duration restApiReadyDelay;
+ Duration savepointTriggerGracePeriod;
String flinkServiceHostOverride;
Set<String> watchedNamespaces;
public static FlinkOperatorConfiguration fromConfiguration(Configuration
operatorConfig) {
- int reconcileIntervalSeconds =
- operatorConfig.getInteger(
-
OperatorConfigOptions.OPERATOR_RECONCILER_RESCHEDULE_INTERVAL_IN_SEC);
+ Duration reconcileInterval =
+
operatorConfig.get(OperatorConfigOptions.OPERATOR_RECONCILER_RESCHEDULE_INTERVAL);
- int restApiReadyDelaySeconds =
- operatorConfig.getInteger(
-
OperatorConfigOptions.OPERATOR_OBSERVER_REST_READY_DELAY_IN_SEC);
+ Duration restApiReadyDelay =
+
operatorConfig.get(OperatorConfigOptions.OPERATOR_OBSERVER_REST_READY_DELAY);
- int progressCheckIntervalSeconds =
- operatorConfig.getInteger(
-
OperatorConfigOptions.OPERATOR_OBSERVER_PROGRESS_CHECK_INTERVAL_IN_SEC);
+ Duration progressCheckInterval =
+
operatorConfig.get(OperatorConfigOptions.OPERATOR_OBSERVER_PROGRESS_CHECK_INTERVAL);
- int savepointTriggerGracePeriodSeconds =
- operatorConfig.getInteger(
- OperatorConfigOptions
-
.OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD_IN_SEC);
+ Duration savepointTriggerGracePeriod =
+ operatorConfig.get(
+
OperatorConfigOptions.OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD);
String flinkServiceHostOverride = null;
if (EnvUtils.get("KUBERNETES_SERVICE_HOST") == null) {
@@ -64,10 +61,10 @@ public class FlinkOperatorConfiguration {
Set<String> watchedNamespaces = OperatorUtils.getWatchedNamespaces();
return new FlinkOperatorConfiguration(
- reconcileIntervalSeconds,
- progressCheckIntervalSeconds,
- restApiReadyDelaySeconds,
- savepointTriggerGracePeriodSeconds,
+ reconcileInterval,
+ progressCheckInterval,
+ restApiReadyDelay,
+ savepointTriggerGracePeriod,
flinkServiceHostOverride,
watchedNamespaces);
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
index 9e8948e..67f47e7 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
@@ -21,35 +21,36 @@ package org.apache.flink.kubernetes.operator.config;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import java.time.Duration;
+
/** This class holds configuration constants used by flink operator. */
public class OperatorConfigOptions {
- public static final ConfigOption<Integer>
OPERATOR_RECONCILER_RESCHEDULE_INTERVAL_IN_SEC =
- ConfigOptions.key("operator.reconciler.reschedule.interval.sec")
- .intType()
- .defaultValue(60)
+ public static final ConfigOption<Duration>
OPERATOR_RECONCILER_RESCHEDULE_INTERVAL =
+ ConfigOptions.key("operator.reconciler.reschedule.interval")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(60))
.withDescription(
- "The interval in second for the controller to
reschedule the reconcile process");
+ "The interval for the controller to reschedule the
reconcile process");
- public static final ConfigOption<Integer>
OPERATOR_OBSERVER_REST_READY_DELAY_IN_SEC =
- ConfigOptions.key("operator.observer.rest-ready.delay.sec")
- .intType()
- .defaultValue(10)
+ public static final ConfigOption<Duration>
OPERATOR_OBSERVER_REST_READY_DELAY =
+ ConfigOptions.key("operator.observer.rest-ready.delay")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(10))
.withDescription(
"Final delay before deployment is marked ready
after port becomes accessible.");
- public static final ConfigOption<Integer>
OPERATOR_OBSERVER_PROGRESS_CHECK_INTERVAL_IN_SEC =
- ConfigOptions.key("operator.observer.progress-check.interval.sec")
- .intType()
- .defaultValue(10)
+ public static final ConfigOption<Duration>
OPERATOR_OBSERVER_PROGRESS_CHECK_INTERVAL =
+ ConfigOptions.key("operator.observer.progress-check.interval")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(10))
.withDescription(
"The interval for observing status for in-progress
operations such as deployment and savepoints.");
- public static final ConfigOption<Integer>
- OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD_IN_SEC =
-
ConfigOptions.key("operator.observer.savepoint.trigger.grace-period.sec")
- .intType()
- .defaultValue(10)
- .withDescription(
- "The interval in seconds before a
savepoint trigger attempt is marked as unsuccessful");
+ public static final ConfigOption<Duration>
OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD =
+
ConfigOptions.key("operator.observer.savepoint.trigger.grace-period")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(10))
+ .withDescription(
+ "The interval before a savepoint trigger attempt
is marked as unsuccessful");
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
index efd84d3..6d50154 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
@@ -44,27 +44,27 @@ public enum JobManagerDeploymentStatus {
public Duration rescheduleAfter(
FlinkDeployment flinkDeployment, FlinkOperatorConfiguration
operatorConfiguration) {
- int rescheduleAfterSec;
+ Duration rescheduleAfter;
switch (this) {
case DEPLOYING:
- rescheduleAfterSec =
operatorConfiguration.getProgressCheckIntervalSeconds();
+ rescheduleAfter =
operatorConfiguration.getProgressCheckInterval();
break;
case READY:
- rescheduleAfterSec =
+ rescheduleAfter =
SavepointUtils.savepointInProgress(flinkDeployment)
- ?
operatorConfiguration.getProgressCheckIntervalSeconds()
- :
operatorConfiguration.getReconcileIntervalSeconds();
+ ?
operatorConfiguration.getProgressCheckInterval()
+ : operatorConfiguration.getReconcileInterval();
break;
case MISSING:
case ERROR:
- rescheduleAfterSec =
operatorConfiguration.getReconcileIntervalSeconds();
+ rescheduleAfter = operatorConfiguration.getReconcileInterval();
break;
case DEPLOYED_NOT_READY:
- rescheduleAfterSec =
operatorConfiguration.getRestApiReadyDelaySeconds();
+ rescheduleAfter = operatorConfiguration.getRestApiReadyDelay();
break;
default:
throw new RuntimeException("Unknown status: " + this);
}
- return Duration.ofSeconds(rescheduleAfterSec);
+ return rescheduleAfter;
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
index 70390f3..1caa512 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
@@ -21,7 +21,7 @@ import
org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
-import java.util.concurrent.TimeUnit;
+import java.time.Duration;
/** Savepoint utilities. */
public class SavepointUtils {
@@ -50,9 +50,8 @@ public class SavepointUtils {
public static boolean gracePeriodEnded(
FlinkOperatorConfiguration configuration, SavepointInfo
savepointInfo) {
- int gracePeriod =
configuration.getSavepointTriggerGracePeriodSeconds();
+ Duration gracePeriod = configuration.getSavepointTriggerGracePeriod();
long triggerTimestamp = savepointInfo.getTriggerTimestamp();
- return (System.currentTimeMillis() - triggerTimestamp)
- > TimeUnit.SECONDS.toMillis(gracePeriod);
+ return (System.currentTimeMillis() - triggerTimestamp) >
gracePeriod.toMillis();
}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 3c1af8c..6093a8e 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -52,12 +52,12 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.net.HttpURLConnection;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -73,7 +73,13 @@ public class FlinkDeploymentControllerTest {
private final Context context =
TestUtils.createContextWithReadyJobManagerDeployment();
private final FlinkOperatorConfiguration operatorConfiguration =
- new FlinkOperatorConfiguration(1, 2, 3, 4, null,
Collections.emptySet());
+ new FlinkOperatorConfiguration(
+ Duration.ofSeconds(1),
+ Duration.ofSeconds(2),
+ Duration.ofSeconds(3),
+ Duration.ofSeconds(4),
+ null,
+ Collections.emptySet());
private TestingFlinkService flinkService;
private FlinkDeploymentController testController;
@@ -96,9 +102,7 @@ public class FlinkDeploymentControllerTest {
updateControl = testController.reconcile(appCluster,
TestUtils.createEmptyContext());
assertTrue(updateControl.isUpdateStatus());
assertEquals(
- Optional.of(
- TimeUnit.SECONDS.toMillis(
-
operatorConfiguration.getProgressCheckIntervalSeconds())),
+
Optional.of(operatorConfiguration.getProgressCheckInterval().toMillis()),
updateControl.getScheduleDelay());
// Validate reconciliation status
@@ -111,17 +115,13 @@ public class FlinkDeploymentControllerTest {
updateControl = testController.reconcile(appCluster, context);
assertTrue(updateControl.isUpdateStatus());
assertEquals(
- Optional.of(
- TimeUnit.SECONDS.toMillis(
-
operatorConfiguration.getRestApiReadyDelaySeconds())),
+
Optional.of(operatorConfiguration.getRestApiReadyDelay().toMillis()),
updateControl.getScheduleDelay());
updateControl = testController.reconcile(appCluster, context);
assertTrue(updateControl.isUpdateStatus());
assertEquals(
- Optional.of(
- TimeUnit.SECONDS.toMillis(
-
operatorConfiguration.getReconcileIntervalSeconds())),
+
Optional.of(operatorConfiguration.getReconcileInterval().toMillis()),
updateControl.getScheduleDelay());
// Validate job status
@@ -171,9 +171,7 @@ public class FlinkDeploymentControllerTest {
appCluster,
TestUtils.createContextWithFailedJobManagerDeployment());
assertTrue(updateControl.isUpdateStatus());
assertEquals(
- Optional.of(
- TimeUnit.SECONDS.toMillis(
-
operatorConfiguration.getReconcileIntervalSeconds())),
+
Optional.of(operatorConfiguration.getReconcileInterval().toMillis()),
updateControl.getScheduleDelay());
RecordedRequest recordedRequest = mockServer.getLastRequest();
@@ -242,9 +240,7 @@ public class FlinkDeploymentControllerTest {
appCluster,
TestUtils.createContextWithInProgressDeployment());
assertTrue(updateControl.isUpdateStatus());
assertEquals(
- Optional.of(
- TimeUnit.SECONDS.toMillis(
-
operatorConfiguration.getReconcileIntervalSeconds())),
+
Optional.of(operatorConfiguration.getReconcileInterval().toMillis()),
updateControl.getScheduleDelay());
RecordedRequest recordedRequest = mockServer.getLastRequest();
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
index 226ed8e..8a90cf2 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
@@ -33,6 +33,7 @@ import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/** {@link JobObserver} unit tests. */
public class JobObserverTest {
@@ -96,11 +97,15 @@ public class JobObserverTest {
assertEquals(
deployment.getMetadata().getName(),
deployment.getStatus().getJobStatus().getJobName());
- assertEquals(
+ assertTrue(
Long.valueOf(deployment.getStatus().getJobStatus().getUpdateTime())
- .compareTo(
-
Long.valueOf(deployment.getStatus().getJobStatus().getStartTime())),
- 1);
+ .compareTo(
+ Long.valueOf(
+ deployment
+ .getStatus()
+ .getJobStatus()
+ .getStartTime()))
+ >= 0);
// Test listing failure
flinkService.clear();
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
index 1721671..1b79649 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
@@ -33,6 +33,7 @@ import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
import org.junit.jupiter.api.Test;
+import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
@@ -87,13 +88,29 @@ public class SessionObserverTest {
.setLastReconciledSpec(deployment.getSpec());
FlinkOperatorConfiguration allNsConfig =
- new FlinkOperatorConfiguration(1, 2, 3, 4, null,
Collections.emptySet());
+ new FlinkOperatorConfiguration(
+ Duration.ofSeconds(1),
+ Duration.ofSeconds(2),
+ Duration.ofSeconds(3),
+ Duration.ofSeconds(4),
+ null,
+ Collections.emptySet());
FlinkOperatorConfiguration specificNsConfig =
new FlinkOperatorConfiguration(
- 1, 2, 3, 4, null,
Set.of(deployment.getMetadata().getNamespace()));
+ Duration.ofSeconds(1),
+ Duration.ofSeconds(2),
+ Duration.ofSeconds(3),
+ Duration.ofSeconds(4),
+ null,
+ Set.of(deployment.getMetadata().getNamespace()));
FlinkOperatorConfiguration multipleNsConfig =
new FlinkOperatorConfiguration(
- 1, 2, 3, 4, null,
Set.of(deployment.getMetadata().getNamespace(), "ns"));
+ Duration.ofSeconds(1),
+ Duration.ofSeconds(2),
+ Duration.ofSeconds(3),
+ Duration.ofSeconds(4),
+ null,
+ Set.of(deployment.getMetadata().getNamespace(), "ns"));
Deployment k8sDeployment = new Deployment();
k8sDeployment.setSpec(new DeploymentSpec());
diff --git a/helm/flink-operator/conf/flink-operator-config/flink-conf.yaml
b/helm/flink-operator/conf/flink-operator-config/flink-conf.yaml
index 71ebacb..ae2fab8 100644
--- a/helm/flink-operator/conf/flink-operator-config/flink-conf.yaml
+++ b/helm/flink-operator/conf/flink-operator-config/flink-conf.yaml
@@ -16,10 +16,10 @@
# limitations under the License.
################################################################################
-# operator.reconciler.reschedule.interval.sec: 60
-# operator.observer.rest-ready.delay.sec: 10
-# operator.observer.progress-check.interval.sec: 10
-# operator.observer.savepoint.trigger.grace-period.sec: 10
+# operator.reconciler.reschedule.interval: 60 s
+# operator.observer.rest-ready.delay: 10 s
+# operator.observer.progress-check.interval: 10 s
+# operator.observer.savepoint.trigger.grace-period: 10 s
# metrics.reporter.slf4j.factory.class:
org.apache.flink.metrics.slf4j.Slf4jReporterFactory
# metrics.reporter.slf4j.interval: 5 MINUTE
diff --git a/helm/flink-operator/values.yaml b/helm/flink-operator/values.yaml
index 56df161..8b90d6b 100644
--- a/helm/flink-operator/values.yaml
+++ b/helm/flink-operator/values.yaml
@@ -57,8 +57,8 @@ operatorConfiguration:
metrics.reporter.slf4j.factory.class:
org.apache.flink.metrics.slf4j.Slf4jReporterFactory
metrics.reporter.slf4j.interval: 5 MINUTE
- operator.reconciler.reschedule.interval.sec: 15
- operator.observer.progress-check.interval.sec: 5
+ operator.reconciler.reschedule.interval: 15 s
+ operator.observer.progress-check.interval: 5 s
log4j2.properties: |+
rootLogger.level = DEBUG