This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 354da49  [FLINK-26714] Remove .sec from OperatorConfigOptions and use 
Duration type config instead
354da49 is described below

commit 354da496d0142894ea0b5b32219f0ce89d7551a5
Author: SteNicholas <[email protected]>
AuthorDate: Fri Mar 18 12:06:40 2022 +0800

    [FLINK-26714] Remove .sec from OperatorConfigOptions and use Duration type 
config instead
---
 .../config/FlinkOperatorConfiguration.java         | 39 ++++++++++----------
 .../operator/config/OperatorConfigOptions.java     | 41 +++++++++++-----------
 .../observer/JobManagerDeploymentStatus.java       | 16 ++++-----
 .../kubernetes/operator/utils/SavepointUtils.java  |  7 ++--
 .../controller/FlinkDeploymentControllerTest.java  | 30 +++++++---------
 .../operator/observer/JobObserverTest.java         | 13 ++++---
 .../operator/observer/SessionObserverTest.java     | 23 ++++++++++--
 .../conf/flink-operator-config/flink-conf.yaml     |  8 ++---
 helm/flink-operator/values.yaml                    |  4 +--
 9 files changed, 98 insertions(+), 83 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index 6e25075..016e494 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -24,36 +24,33 @@ import 
org.apache.flink.kubernetes.operator.utils.OperatorUtils;
 
 import lombok.Value;
 
+import java.time.Duration;
 import java.util.Set;
 
 /** Configuration class for operator. */
 @Value
 public class FlinkOperatorConfiguration {
 
-    int reconcileIntervalSeconds;
-    int progressCheckIntervalSeconds;
-    int restApiReadyDelaySeconds;
-    int savepointTriggerGracePeriodSeconds;
+    Duration reconcileInterval;
+    Duration progressCheckInterval;
+    Duration restApiReadyDelay;
+    Duration savepointTriggerGracePeriod;
     String flinkServiceHostOverride;
     Set<String> watchedNamespaces;
 
     public static FlinkOperatorConfiguration fromConfiguration(Configuration 
operatorConfig) {
-        int reconcileIntervalSeconds =
-                operatorConfig.getInteger(
-                        
OperatorConfigOptions.OPERATOR_RECONCILER_RESCHEDULE_INTERVAL_IN_SEC);
+        Duration reconcileInterval =
+                
operatorConfig.get(OperatorConfigOptions.OPERATOR_RECONCILER_RESCHEDULE_INTERVAL);
 
-        int restApiReadyDelaySeconds =
-                operatorConfig.getInteger(
-                        
OperatorConfigOptions.OPERATOR_OBSERVER_REST_READY_DELAY_IN_SEC);
+        Duration restApiReadyDelay =
+                
operatorConfig.get(OperatorConfigOptions.OPERATOR_OBSERVER_REST_READY_DELAY);
 
-        int progressCheckIntervalSeconds =
-                operatorConfig.getInteger(
-                        
OperatorConfigOptions.OPERATOR_OBSERVER_PROGRESS_CHECK_INTERVAL_IN_SEC);
+        Duration progressCheckInterval =
+                
operatorConfig.get(OperatorConfigOptions.OPERATOR_OBSERVER_PROGRESS_CHECK_INTERVAL);
 
-        int savepointTriggerGracePeriodSeconds =
-                operatorConfig.getInteger(
-                        OperatorConfigOptions
-                                
.OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD_IN_SEC);
+        Duration savepointTriggerGracePeriod =
+                operatorConfig.get(
+                        
OperatorConfigOptions.OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD);
 
         String flinkServiceHostOverride = null;
         if (EnvUtils.get("KUBERNETES_SERVICE_HOST") == null) {
@@ -64,10 +61,10 @@ public class FlinkOperatorConfiguration {
         Set<String> watchedNamespaces = OperatorUtils.getWatchedNamespaces();
 
         return new FlinkOperatorConfiguration(
-                reconcileIntervalSeconds,
-                progressCheckIntervalSeconds,
-                restApiReadyDelaySeconds,
-                savepointTriggerGracePeriodSeconds,
+                reconcileInterval,
+                progressCheckInterval,
+                restApiReadyDelay,
+                savepointTriggerGracePeriod,
                 flinkServiceHostOverride,
                 watchedNamespaces);
     }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
index 9e8948e..67f47e7 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
@@ -21,35 +21,36 @@ package org.apache.flink.kubernetes.operator.config;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 
+import java.time.Duration;
+
 /** This class holds configuration constants used by flink operator. */
 public class OperatorConfigOptions {
 
-    public static final ConfigOption<Integer> 
OPERATOR_RECONCILER_RESCHEDULE_INTERVAL_IN_SEC =
-            ConfigOptions.key("operator.reconciler.reschedule.interval.sec")
-                    .intType()
-                    .defaultValue(60)
+    public static final ConfigOption<Duration> 
OPERATOR_RECONCILER_RESCHEDULE_INTERVAL =
+            ConfigOptions.key("operator.reconciler.reschedule.interval")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(60))
                     .withDescription(
-                            "The interval in second for the controller to 
reschedule the reconcile process");
+                            "The interval for the controller to reschedule the 
reconcile process");
 
-    public static final ConfigOption<Integer> 
OPERATOR_OBSERVER_REST_READY_DELAY_IN_SEC =
-            ConfigOptions.key("operator.observer.rest-ready.delay.sec")
-                    .intType()
-                    .defaultValue(10)
+    public static final ConfigOption<Duration> 
OPERATOR_OBSERVER_REST_READY_DELAY =
+            ConfigOptions.key("operator.observer.rest-ready.delay")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(10))
                     .withDescription(
                             "Final delay before deployment is marked ready 
after port becomes accessible.");
 
-    public static final ConfigOption<Integer> 
OPERATOR_OBSERVER_PROGRESS_CHECK_INTERVAL_IN_SEC =
-            ConfigOptions.key("operator.observer.progress-check.interval.sec")
-                    .intType()
-                    .defaultValue(10)
+    public static final ConfigOption<Duration> 
OPERATOR_OBSERVER_PROGRESS_CHECK_INTERVAL =
+            ConfigOptions.key("operator.observer.progress-check.interval")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(10))
                     .withDescription(
                             "The interval for observing status for in-progress 
operations such as deployment and savepoints.");
 
-    public static final ConfigOption<Integer>
-            OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD_IN_SEC =
-                    
ConfigOptions.key("operator.observer.savepoint.trigger.grace-period.sec")
-                            .intType()
-                            .defaultValue(10)
-                            .withDescription(
-                                    "The interval in seconds before a 
savepoint trigger attempt is marked as unsuccessful");
+    public static final ConfigOption<Duration> 
OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD =
+            
ConfigOptions.key("operator.observer.savepoint.trigger.grace-period")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(10))
+                    .withDescription(
+                            "The interval before a savepoint trigger attempt 
is marked as unsuccessful");
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
index efd84d3..6d50154 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
@@ -44,27 +44,27 @@ public enum JobManagerDeploymentStatus {
 
     public Duration rescheduleAfter(
             FlinkDeployment flinkDeployment, FlinkOperatorConfiguration 
operatorConfiguration) {
-        int rescheduleAfterSec;
+        Duration rescheduleAfter;
         switch (this) {
             case DEPLOYING:
-                rescheduleAfterSec = 
operatorConfiguration.getProgressCheckIntervalSeconds();
+                rescheduleAfter = 
operatorConfiguration.getProgressCheckInterval();
                 break;
             case READY:
-                rescheduleAfterSec =
+                rescheduleAfter =
                         SavepointUtils.savepointInProgress(flinkDeployment)
-                                ? 
operatorConfiguration.getProgressCheckIntervalSeconds()
-                                : 
operatorConfiguration.getReconcileIntervalSeconds();
+                                ? 
operatorConfiguration.getProgressCheckInterval()
+                                : operatorConfiguration.getReconcileInterval();
                 break;
             case MISSING:
             case ERROR:
-                rescheduleAfterSec = 
operatorConfiguration.getReconcileIntervalSeconds();
+                rescheduleAfter = operatorConfiguration.getReconcileInterval();
                 break;
             case DEPLOYED_NOT_READY:
-                rescheduleAfterSec = 
operatorConfiguration.getRestApiReadyDelaySeconds();
+                rescheduleAfter = operatorConfiguration.getRestApiReadyDelay();
                 break;
             default:
                 throw new RuntimeException("Unknown status: " + this);
         }
-        return Duration.ofSeconds(rescheduleAfterSec);
+        return rescheduleAfter;
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
index 70390f3..1caa512 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
@@ -21,7 +21,7 @@ import 
org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
 
-import java.util.concurrent.TimeUnit;
+import java.time.Duration;
 
 /** Savepoint utilities. */
 public class SavepointUtils {
@@ -50,9 +50,8 @@ public class SavepointUtils {
 
     public static boolean gracePeriodEnded(
             FlinkOperatorConfiguration configuration, SavepointInfo 
savepointInfo) {
-        int gracePeriod = 
configuration.getSavepointTriggerGracePeriodSeconds();
+        Duration gracePeriod = configuration.getSavepointTriggerGracePeriod();
         long triggerTimestamp = savepointInfo.getTriggerTimestamp();
-        return (System.currentTimeMillis() - triggerTimestamp)
-                > TimeUnit.SECONDS.toMillis(gracePeriod);
+        return (System.currentTimeMillis() - triggerTimestamp) > 
gracePeriod.toMillis();
     }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 3c1af8c..6093a8e 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -52,12 +52,12 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.net.HttpURLConnection;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -73,7 +73,13 @@ public class FlinkDeploymentControllerTest {
 
     private final Context context = 
TestUtils.createContextWithReadyJobManagerDeployment();
     private final FlinkOperatorConfiguration operatorConfiguration =
-            new FlinkOperatorConfiguration(1, 2, 3, 4, null, 
Collections.emptySet());
+            new FlinkOperatorConfiguration(
+                    Duration.ofSeconds(1),
+                    Duration.ofSeconds(2),
+                    Duration.ofSeconds(3),
+                    Duration.ofSeconds(4),
+                    null,
+                    Collections.emptySet());
 
     private TestingFlinkService flinkService;
     private FlinkDeploymentController testController;
@@ -96,9 +102,7 @@ public class FlinkDeploymentControllerTest {
         updateControl = testController.reconcile(appCluster, 
TestUtils.createEmptyContext());
         assertTrue(updateControl.isUpdateStatus());
         assertEquals(
-                Optional.of(
-                        TimeUnit.SECONDS.toMillis(
-                                
operatorConfiguration.getProgressCheckIntervalSeconds())),
+                
Optional.of(operatorConfiguration.getProgressCheckInterval().toMillis()),
                 updateControl.getScheduleDelay());
 
         // Validate reconciliation status
@@ -111,17 +115,13 @@ public class FlinkDeploymentControllerTest {
         updateControl = testController.reconcile(appCluster, context);
         assertTrue(updateControl.isUpdateStatus());
         assertEquals(
-                Optional.of(
-                        TimeUnit.SECONDS.toMillis(
-                                
operatorConfiguration.getRestApiReadyDelaySeconds())),
+                
Optional.of(operatorConfiguration.getRestApiReadyDelay().toMillis()),
                 updateControl.getScheduleDelay());
 
         updateControl = testController.reconcile(appCluster, context);
         assertTrue(updateControl.isUpdateStatus());
         assertEquals(
-                Optional.of(
-                        TimeUnit.SECONDS.toMillis(
-                                
operatorConfiguration.getReconcileIntervalSeconds())),
+                
Optional.of(operatorConfiguration.getReconcileInterval().toMillis()),
                 updateControl.getScheduleDelay());
 
         // Validate job status
@@ -171,9 +171,7 @@ public class FlinkDeploymentControllerTest {
                         appCluster, 
TestUtils.createContextWithFailedJobManagerDeployment());
         assertTrue(updateControl.isUpdateStatus());
         assertEquals(
-                Optional.of(
-                        TimeUnit.SECONDS.toMillis(
-                                
operatorConfiguration.getReconcileIntervalSeconds())),
+                
Optional.of(operatorConfiguration.getReconcileInterval().toMillis()),
                 updateControl.getScheduleDelay());
 
         RecordedRequest recordedRequest = mockServer.getLastRequest();
@@ -242,9 +240,7 @@ public class FlinkDeploymentControllerTest {
                         appCluster, 
TestUtils.createContextWithInProgressDeployment());
         assertTrue(updateControl.isUpdateStatus());
         assertEquals(
-                Optional.of(
-                        TimeUnit.SECONDS.toMillis(
-                                
operatorConfiguration.getReconcileIntervalSeconds())),
+                
Optional.of(operatorConfiguration.getReconcileInterval().toMillis()),
                 updateControl.getScheduleDelay());
 
         RecordedRequest recordedRequest = mockServer.getLastRequest();
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
index 226ed8e..8a90cf2 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
@@ -33,6 +33,7 @@ import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** {@link JobObserver} unit tests. */
 public class JobObserverTest {
@@ -96,11 +97,15 @@ public class JobObserverTest {
         assertEquals(
                 deployment.getMetadata().getName(),
                 deployment.getStatus().getJobStatus().getJobName());
-        assertEquals(
+        assertTrue(
                 
Long.valueOf(deployment.getStatus().getJobStatus().getUpdateTime())
-                        .compareTo(
-                                
Long.valueOf(deployment.getStatus().getJobStatus().getStartTime())),
-                1);
+                                .compareTo(
+                                        Long.valueOf(
+                                                deployment
+                                                        .getStatus()
+                                                        .getJobStatus()
+                                                        .getStartTime()))
+                        >= 0);
 
         // Test listing failure
         flinkService.clear();
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
index 1721671..1b79649 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
@@ -33,6 +33,7 @@ import io.javaoperatorsdk.operator.api.reconciler.Context;
 import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
 import org.junit.jupiter.api.Test;
 
+import java.time.Duration;
 import java.util.Collections;
 import java.util.Optional;
 import java.util.Set;
@@ -87,13 +88,29 @@ public class SessionObserverTest {
                 .setLastReconciledSpec(deployment.getSpec());
 
         FlinkOperatorConfiguration allNsConfig =
-                new FlinkOperatorConfiguration(1, 2, 3, 4, null, 
Collections.emptySet());
+                new FlinkOperatorConfiguration(
+                        Duration.ofSeconds(1),
+                        Duration.ofSeconds(2),
+                        Duration.ofSeconds(3),
+                        Duration.ofSeconds(4),
+                        null,
+                        Collections.emptySet());
         FlinkOperatorConfiguration specificNsConfig =
                 new FlinkOperatorConfiguration(
-                        1, 2, 3, 4, null, 
Set.of(deployment.getMetadata().getNamespace()));
+                        Duration.ofSeconds(1),
+                        Duration.ofSeconds(2),
+                        Duration.ofSeconds(3),
+                        Duration.ofSeconds(4),
+                        null,
+                        Set.of(deployment.getMetadata().getNamespace()));
         FlinkOperatorConfiguration multipleNsConfig =
                 new FlinkOperatorConfiguration(
-                        1, 2, 3, 4, null, 
Set.of(deployment.getMetadata().getNamespace(), "ns"));
+                        Duration.ofSeconds(1),
+                        Duration.ofSeconds(2),
+                        Duration.ofSeconds(3),
+                        Duration.ofSeconds(4),
+                        null,
+                        Set.of(deployment.getMetadata().getNamespace(), "ns"));
 
         Deployment k8sDeployment = new Deployment();
         k8sDeployment.setSpec(new DeploymentSpec());
diff --git a/helm/flink-operator/conf/flink-operator-config/flink-conf.yaml 
b/helm/flink-operator/conf/flink-operator-config/flink-conf.yaml
index 71ebacb..ae2fab8 100644
--- a/helm/flink-operator/conf/flink-operator-config/flink-conf.yaml
+++ b/helm/flink-operator/conf/flink-operator-config/flink-conf.yaml
@@ -16,10 +16,10 @@
 # limitations under the License.
 
################################################################################
 
-# operator.reconciler.reschedule.interval.sec: 60
-# operator.observer.rest-ready.delay.sec: 10
-# operator.observer.progress-check.interval.sec: 10
-# operator.observer.savepoint.trigger.grace-period.sec: 10
+# operator.reconciler.reschedule.interval: 60 s
+# operator.observer.rest-ready.delay: 10 s
+# operator.observer.progress-check.interval: 10 s
+# operator.observer.savepoint.trigger.grace-period: 10 s
 
 # metrics.reporter.slf4j.factory.class: 
org.apache.flink.metrics.slf4j.Slf4jReporterFactory
 # metrics.reporter.slf4j.interval: 5 MINUTE
diff --git a/helm/flink-operator/values.yaml b/helm/flink-operator/values.yaml
index 56df161..8b90d6b 100644
--- a/helm/flink-operator/values.yaml
+++ b/helm/flink-operator/values.yaml
@@ -57,8 +57,8 @@ operatorConfiguration:
     metrics.reporter.slf4j.factory.class: 
org.apache.flink.metrics.slf4j.Slf4jReporterFactory
     metrics.reporter.slf4j.interval: 5 MINUTE
 
-    operator.reconciler.reschedule.interval.sec: 15
-    operator.observer.progress-check.interval.sec: 5
+    operator.reconciler.reschedule.interval: 15 s
+    operator.observer.progress-check.interval: 5 s
   log4j2.properties: |+
     rootLogger.level = DEBUG
 

Reply via email to