This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new c06094c [FLINK-28534] Spec change event is triggered twice per upgrade
c06094c is described below
commit c06094c09a37ba3033922c073eb8d08d5a66ac87
Author: Matyas Orhidi <[email protected]>
AuthorDate: Wed Jul 13 12:44:42 2022 +0200
[FLINK-28534] Spec change event is triggered twice per upgrade
---
.../AbstractFlinkResourceReconciler.java | 14 ++--
.../controller/FlinkDeploymentControllerTest.java | 84 +++++++++++++++-------
.../TestingFlinkDeploymentController.java | 20 +++++-
3 files changed, 86 insertions(+), 32 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 903b4e1..b0d8abe 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -134,12 +134,14 @@ public abstract class AbstractFlinkResourceReconciler<
return;
}
LOG.info(MSG_SPEC_CHANGED);
- eventRecorder.triggerEvent(
- cr,
- EventRecorder.Type.Normal,
- EventRecorder.Reason.SpecChanged,
- EventRecorder.Component.JobManagerDeployment,
- MSG_SPEC_CHANGED);
+ if (reconciliationStatus.getState() !=
ReconciliationState.UPGRADING) {
+ eventRecorder.triggerEvent(
+ cr,
+ EventRecorder.Type.Normal,
+ EventRecorder.Reason.SpecChanged,
+ EventRecorder.Component.JobManagerDeployment,
+ MSG_SPEC_CHANGED);
+ }
reconcileSpecChange(cr, observeConfig, deployConfig);
} else if (shouldRollBack(cr, observeConfig)) {
// Rollbacks are executed in two steps, we initiate it first then
return
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 54e0ace..866ecbd 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -24,14 +24,12 @@ import org.apache.flink.configuration.RestOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
-import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
@@ -39,6 +37,7 @@ import
org.apache.flink.kubernetes.operator.crd.status.TaskManagerInfo;
import
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
@@ -62,7 +61,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.function.BiConsumer;
import java.util.stream.Stream;
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED;
@@ -91,7 +89,6 @@ public class FlinkDeploymentControllerTest {
public void setup() {
flinkService = new TestingFlinkService(kubernetesClient);
context = flinkService.getContext();
-
testController =
new TestingFlinkDeploymentController(configManager,
kubernetesClient, flinkService);
kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
@@ -125,7 +122,6 @@ public class FlinkDeploymentControllerTest {
.getProgressCheckInterval()
.toMillis()),
updateControl.getScheduleDelay());
-
// Validate reconciliation status
ReconciliationStatus<FlinkDeploymentSpec> reconciliationStatus =
appCluster.getStatus().getReconciliationStatus();
@@ -376,7 +372,6 @@ public class FlinkDeploymentControllerTest {
.put(
CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
"file:///flink-data/savepoints");
-
testController.reconcile(appCluster, context);
List<Tuple2<String, JobStatusMessage>> jobs = flinkService.listJobs();
assertEquals(1, jobs.size());
@@ -461,6 +456,7 @@ public class FlinkDeploymentControllerTest {
@ParameterizedTest
@EnumSource(FlinkVersion.class)
public void verifyStatelessUpgrade(FlinkVersion flinkVersion) throws
Exception {
+ testController.events().clear();
FlinkDeployment appCluster =
TestUtils.buildApplicationCluster(flinkVersion);
appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
appCluster.getSpec().getJob().setInitialSavepointPath("s0");
@@ -469,15 +465,32 @@ public class FlinkDeploymentControllerTest {
List<Tuple2<String, JobStatusMessage>> jobs = flinkService.listJobs();
assertEquals(1, jobs.size());
assertEquals("s0", jobs.get(0).f0);
-
+ assertEquals(1, testController.events().size());
+ assertEquals(
+ EventRecorder.Reason.Submit,
+
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
testController.reconcile(appCluster, context);
testController.reconcile(appCluster, context);
+ assertEquals(1, testController.events().size());
+ assertEquals(
+ EventRecorder.Reason.StatusChanged,
+
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+
// Upgrade job
appCluster.getSpec().getJob().setParallelism(100);
UpdateControl<FlinkDeployment> updateControl =
testController.reconcile(appCluster, context);
+
+ assertEquals(2, testController.events().size());
+ assertEquals(
+ EventRecorder.Reason.SpecChanged,
+
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+ assertEquals(
+ EventRecorder.Reason.Suspended,
+
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+
assertEquals(0, updateControl.getScheduleDelay().get());
assertEquals(
JobState.SUSPENDED,
@@ -489,6 +502,11 @@ public class FlinkDeploymentControllerTest {
.getState());
updateControl = testController.reconcile(appCluster, context);
+ assertEquals(1, testController.events().size());
+ assertEquals(
+ EventRecorder.Reason.Submit,
+
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+
assertEquals(
JobManagerDeploymentStatus.DEPLOYING
.rescheduleAfter(appCluster,
configManager.getOperatorConfiguration())
@@ -502,14 +520,33 @@ public class FlinkDeploymentControllerTest {
testController.reconcile(appCluster, context);
testController.reconcile(appCluster, context);
+ assertEquals(1, testController.events().size());
+ assertEquals(
+ EventRecorder.Reason.StatusChanged,
+
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
// Suspend job
appCluster.getSpec().getJob().setState(JobState.SUSPENDED);
testController.reconcile(appCluster, context);
+ assertEquals(2, testController.events().size());
+ assertEquals(
+ EventRecorder.Reason.SpecChanged,
+
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+ assertEquals(
+ EventRecorder.Reason.Suspended,
+
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+
// Resume from empty state
appCluster.getSpec().getJob().setState(JobState.RUNNING);
testController.reconcile(appCluster, context);
+ assertEquals(2, testController.events().size());
+ assertEquals(
+ EventRecorder.Reason.SpecChanged,
+
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+ assertEquals(
+ EventRecorder.Reason.Submit,
+
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
jobs = flinkService.listJobs();
assertEquals(1, jobs.size());
assertEquals(null, jobs.get(0).f0);
@@ -517,6 +554,13 @@ public class FlinkDeploymentControllerTest {
// Inject validation error in the middle of the upgrade
appCluster.getSpec().setRestartNonce(123L);
testController.reconcile(appCluster, context);
+ assertEquals(2, testController.events().size());
+ assertEquals(
+ EventRecorder.Reason.SpecChanged,
+
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+ assertEquals(
+ EventRecorder.Reason.Suspended,
+
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
assertEquals(
JobState.SUSPENDED,
appCluster
@@ -527,8 +571,16 @@ public class FlinkDeploymentControllerTest {
.getState());
appCluster.getSpec().setLogConfiguration(Map.of("invalid", "conf"));
testController.reconcile(appCluster, TestUtils.createEmptyContext());
+ assertEquals(1, testController.events().size());
+ assertEquals(
+ EventRecorder.Reason.Submit,
+
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
testController.reconcile(appCluster, context);
testController.reconcile(appCluster, context);
+ assertEquals(1, testController.events().size());
+ assertEquals(
+ EventRecorder.Reason.StatusChanged,
+
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
assertEquals(
JobManagerDeploymentStatus.READY,
@@ -890,22 +942,4 @@ public class FlinkDeploymentControllerTest {
}
return args.stream();
}
-
- private static class StatusUpdateCounter
- implements BiConsumer<
- AbstractFlinkResource<?, FlinkDeploymentStatus>,
FlinkDeploymentStatus> {
- private int counter;
-
- @Override
- public void accept(
- AbstractFlinkResource<?, FlinkDeploymentStatus>
- flinkDeploymentStatusAbstractFlinkResource,
- FlinkDeploymentStatus flinkDeploymentStatus) {
- counter++;
- }
-
- public int getCount() {
- return counter;
- }
- }
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index 68a29f7..2def2b4 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -30,6 +30,7 @@ import
org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
+import io.fabric8.kubernetes.api.model.Event;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -43,7 +44,9 @@ import
io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import org.junit.jupiter.api.Assertions;
+import java.util.LinkedList;
import java.util.Map;
+import java.util.Queue;
import java.util.function.BiConsumer;
/** A wrapper around {@link FlinkDeploymentController} used by unit tests. */
@@ -55,6 +58,8 @@ public class TestingFlinkDeploymentController
private FlinkDeploymentController flinkDeploymentController;
private StatusUpdateCounter statusUpdateCounter = new
StatusUpdateCounter();
+ private EventCollector eventCollector = new EventCollector();
+
private EventRecorder eventRecorder;
private StatusRecorder statusRecorder;
@@ -62,7 +67,7 @@ public class TestingFlinkDeploymentController
FlinkConfigManager configManager,
KubernetesClient kubernetesClient,
TestingFlinkService flinkService) {
- eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
+ eventRecorder = new EventRecorder(kubernetesClient, eventCollector);
statusRecorder =
new StatusRecorder<>(
kubernetesClient,
@@ -117,6 +122,19 @@ public class TestingFlinkDeploymentController
throw new UnsupportedOperationException();
}
+ private static class EventCollector implements
BiConsumer<AbstractFlinkResource<?, ?>, Event> {
+ private Queue<Event> events = new LinkedList<>();
+
+ @Override
+ public void accept(AbstractFlinkResource<?, ?> abstractFlinkResource,
Event event) {
+ events.add(event);
+ }
+ }
+
+ public Queue<Event> events() {
+ return eventCollector.events;
+ }
+
private static class StatusUpdateCounter
implements BiConsumer<
AbstractFlinkResource<?, FlinkDeploymentStatus>,
FlinkDeploymentStatus> {