gyfora commented on code in PR #657:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/657#discussion_r1316788012


##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java:
##########
@@ -873,6 +891,58 @@ public void cancelJob(
         assertEquals(1, rescaleCounter.get());
     }
 
+    @Test
+    public void testScaleWithJobGraphKeptInHAMetadata() throws Exception {
+        final String jobGraphKey = Constants.JOB_GRAPH_STORE_KEY_PREFIX + 
JobID.generate();
+        final String jobGraphVal = "job-graph-data";
+        createHAConfigMapWithData(
+                kubernetesClient,
+                "ha-configmap",
+                TEST_NAMESPACE,
+                TEST_DEPLOYMENT_NAME,
+                Collections.singletonMap(jobGraphKey, jobGraphVal));
+        final Map<String, String> configMapLabels =
+                KubernetesUtils.getConfigMapLabels(
+                        TEST_DEPLOYMENT_NAME, 
Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY);
+        assertEquals(
+                jobGraphVal,
+                kubernetesClient
+                        .configMaps()
+                        .inNamespace(TEST_NAMESPACE)
+                        .withLabels(configMapLabels)
+                        .list()
+                        .getItems()
+                        .get(0)
+                        .getData()
+                        .get(jobGraphKey));
+
+        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        getJobSpec(deployment).setUpgradeMode(UpgradeMode.LAST_STATE);
+        reconciler.reconcile(deployment, context);
+        verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+        getJobSpec(deployment).setParallelism(100);
+        reconciler.reconcile(deployment, context);
+        assertEquals(JobState.SUSPENDED, getReconciledJobState(deployment));
+        
assertFalse(deployment.getStatus().getReconciliationStatus().scalingInProgress());
+        getJobSpec(deployment).setState(JobState.SUSPENDED);
+        deployment
+                .getSpec()
+                .getFlinkConfiguration()
+                .put(PipelineOptions.PARALLELISM_OVERRIDES.key(), new 
JobVertexID() + ":1");
+        reconciler.reconcile(deployment, context);
+        assertEquals(

Review Comment:
   Sorry but I still don't fully understand what is happening here.
   1. You first change the parallelism (that is not a `SCALE` change in native 
mode only the parallelism overrides currently) and then you assert that the job 
is suspended? What is the point? Could we simply remove this part of the test?
   2. You set the state to suspended and add parallelism overrides. That will 
not scale the job, that will suspend the job. We naturally don't delete things 
on suspend, only on recovery so there is nothing to test at this point but also 
once you restore the jobgrapgh should be deleted because this is not SCALE 
change because you changed the state as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to