gyfora commented on code in PR #337:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/337#discussion_r940988337
##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java:
##########
@@ -337,6 +339,77 @@ public void testEffectiveStatus() {
AbstractFlinkService.getEffectiveStatus(allFinished));
}
+ @Test
+ public void testNativeSavepointFormat() throws Exception {
+ final TestingClusterClient<String> testingClusterClient =
+ new TestingClusterClient<>(configuration,
TestUtils.TEST_DEPLOYMENT_NAME);
+ final String savepointPath = "file:///path/of/svp";
+ final CompletableFuture<Tuple3<JobID, String, Boolean>>
savepointFeature =
+ new CompletableFuture<>();
+ configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY,
savepointPath);
+ testingClusterClient.setRequestProcessor(
+ (headers, parameters, requestBody) -> {
+ savepointFeature.complete(
+ new Tuple3<>(
+ ((SavepointTriggerMessageParameters)
parameters)
+ .jobID.getValue(),
+ ((SavepointTriggerRequestBody) requestBody)
+ .getTargetDirectory()
+ .get(),
+ ((SavepointTriggerRequestBody)
requestBody).isCancelJob()));
+ return CompletableFuture.completedFuture(new
TriggerResponse(new TriggerId()));
+ });
+ final CompletableFuture<Tuple3<JobID, Boolean, String>>
stopWithSavepointFuture =
+ new CompletableFuture<>();
+ testingClusterClient.setStopWithSavepointFunction(
Review Comment:
We should improve this to also capture the format type so we can test it.
##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java:
##########
@@ -337,6 +339,77 @@ public void testEffectiveStatus() {
AbstractFlinkService.getEffectiveStatus(allFinished));
}
+ @Test
+ public void testNativeSavepointFormat() throws Exception {
+ final TestingClusterClient<String> testingClusterClient =
+ new TestingClusterClient<>(configuration,
TestUtils.TEST_DEPLOYMENT_NAME);
+ final String savepointPath = "file:///path/of/svp";
+ final CompletableFuture<Tuple3<JobID, String, Boolean>>
savepointFeature =
+ new CompletableFuture<>();
+ configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY,
savepointPath);
+ testingClusterClient.setRequestProcessor(
+ (headers, parameters, requestBody) -> {
+ savepointFeature.complete(
+ new Tuple3<>(
+ ((SavepointTriggerMessageParameters)
parameters)
+ .jobID.getValue(),
+ ((SavepointTriggerRequestBody) requestBody)
+ .getTargetDirectory()
+ .get(),
+ ((SavepointTriggerRequestBody)
requestBody).isCancelJob()));
Review Comment:
should we also capture the savepoint format? I think thats what we want to
test here
##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java:
##########
@@ -337,6 +339,77 @@ public void testEffectiveStatus() {
AbstractFlinkService.getEffectiveStatus(allFinished));
}
+ @Test
+ public void testNativeSavepointFormat() throws Exception {
+ final TestingClusterClient<String> testingClusterClient =
+ new TestingClusterClient<>(configuration,
TestUtils.TEST_DEPLOYMENT_NAME);
+ final String savepointPath = "file:///path/of/svp";
+ final CompletableFuture<Tuple3<JobID, String, Boolean>>
savepointFeature =
Review Comment:
typo: `savepointFeature` -> `savepointFuture`
##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java:
##########
@@ -337,6 +339,77 @@ public void testEffectiveStatus() {
AbstractFlinkService.getEffectiveStatus(allFinished));
}
+ @Test
+ public void testNativeSavepointFormat() throws Exception {
+ final TestingClusterClient<String> testingClusterClient =
+ new TestingClusterClient<>(configuration,
TestUtils.TEST_DEPLOYMENT_NAME);
+ final String savepointPath = "file:///path/of/svp";
+ final CompletableFuture<Tuple3<JobID, String, Boolean>>
savepointFeature =
+ new CompletableFuture<>();
+ configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY,
savepointPath);
+ testingClusterClient.setRequestProcessor(
+ (headers, parameters, requestBody) -> {
+ savepointFeature.complete(
+ new Tuple3<>(
+ ((SavepointTriggerMessageParameters)
parameters)
+ .jobID.getValue(),
+ ((SavepointTriggerRequestBody) requestBody)
+ .getTargetDirectory()
+ .get(),
+ ((SavepointTriggerRequestBody)
requestBody).isCancelJob()));
+ return CompletableFuture.completedFuture(new
TriggerResponse(new TriggerId()));
+ });
+ final CompletableFuture<Tuple3<JobID, Boolean, String>>
stopWithSavepointFuture =
+ new CompletableFuture<>();
+ testingClusterClient.setStopWithSavepointFunction(
+ (id, advanceToEndOfEventTime, savepointDir) -> {
+ stopWithSavepointFuture.complete(
+ new Tuple3<>(id, advanceToEndOfEventTime,
savepointDir));
+ return CompletableFuture.completedFuture(savepointPath);
+ });
+
+ final FlinkService flinkService =
createFlinkService(testingClusterClient);
+
+ final JobID jobID = JobID.generate();
+ final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+ deployment
+ .getSpec()
+ .getFlinkConfiguration()
+ .put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
savepointPath);
+
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+ JobStatus jobStatus = deployment.getStatus().getJobStatus();
+ jobStatus.setJobId(jobID.toHexString());
+
jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
+ ReconciliationUtils.updateStatusForDeployedSpec(deployment, new
Configuration());
+
+ jobStatus.setJobId(jobID.toString());
+ deployment.getStatus().setJobStatus(jobStatus);
+ flinkService.triggerSavepoint(
+ deployment.getStatus().getJobStatus().getJobId(),
+ SavepointTriggerType.MANUAL,
+ deployment.getStatus().getJobStatus().getSavepointInfo(),
+ new Configuration(configuration)
+ .set(
+
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE,
+ SavepointFormatType.NATIVE));
+ assertTrue(savepointFeature.isDone());
+ assertEquals(jobID, savepointFeature.get().f0);
+ assertEquals(savepointPath, savepointFeature.get().f1);
+ assertFalse(savepointFeature.get().f2);
+
+ flinkService.cancelJob(
+ deployment,
+ UpgradeMode.SAVEPOINT,
+ new Configuration(configManager.getObserveConfig(deployment))
+ .set(
+
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE,
+ SavepointFormatType.NATIVE));
+ assertTrue(stopWithSavepointFuture.isDone());
+ assertEquals(jobID, stopWithSavepointFuture.get().f0);
+ assertFalse(stopWithSavepointFuture.get().f1);
+ assertEquals(savepointPath, stopWithSavepointFuture.get().f2);
Review Comment:
these are nice but would be better to test the format also
##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java:
##########
@@ -337,6 +339,77 @@ public void testEffectiveStatus() {
AbstractFlinkService.getEffectiveStatus(allFinished));
}
+ @Test
+ public void testNativeSavepointFormat() throws Exception {
+ final TestingClusterClient<String> testingClusterClient =
+ new TestingClusterClient<>(configuration,
TestUtils.TEST_DEPLOYMENT_NAME);
+ final String savepointPath = "file:///path/of/svp";
+ final CompletableFuture<Tuple3<JobID, String, Boolean>>
savepointFeature =
+ new CompletableFuture<>();
+ configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY,
savepointPath);
+ testingClusterClient.setRequestProcessor(
+ (headers, parameters, requestBody) -> {
+ savepointFeature.complete(
+ new Tuple3<>(
+ ((SavepointTriggerMessageParameters)
parameters)
+ .jobID.getValue(),
+ ((SavepointTriggerRequestBody) requestBody)
+ .getTargetDirectory()
+ .get(),
+ ((SavepointTriggerRequestBody)
requestBody).isCancelJob()));
+ return CompletableFuture.completedFuture(new
TriggerResponse(new TriggerId()));
+ });
+ final CompletableFuture<Tuple3<JobID, Boolean, String>>
stopWithSavepointFuture =
+ new CompletableFuture<>();
+ testingClusterClient.setStopWithSavepointFunction(
+ (id, advanceToEndOfEventTime, savepointDir) -> {
+ stopWithSavepointFuture.complete(
+ new Tuple3<>(id, advanceToEndOfEventTime,
savepointDir));
+ return CompletableFuture.completedFuture(savepointPath);
+ });
+
+ final FlinkService flinkService =
createFlinkService(testingClusterClient);
+
+ final JobID jobID = JobID.generate();
+ final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+ deployment
+ .getSpec()
+ .getFlinkConfiguration()
+ .put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
savepointPath);
+
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+ JobStatus jobStatus = deployment.getStatus().getJobStatus();
+ jobStatus.setJobId(jobID.toHexString());
+
jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
+ ReconciliationUtils.updateStatusForDeployedSpec(deployment, new
Configuration());
+
+ jobStatus.setJobId(jobID.toString());
+ deployment.getStatus().setJobStatus(jobStatus);
+ flinkService.triggerSavepoint(
+ deployment.getStatus().getJobStatus().getJobId(),
+ SavepointTriggerType.MANUAL,
+ deployment.getStatus().getJobStatus().getSavepointInfo(),
+ new Configuration(configuration)
+ .set(
+
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE,
+ SavepointFormatType.NATIVE));
+ assertTrue(savepointFeature.isDone());
+ assertEquals(jobID, savepointFeature.get().f0);
+ assertEquals(savepointPath, savepointFeature.get().f1);
+ assertFalse(savepointFeature.get().f2);
Review Comment:
these are nice but would be better to test the format also
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]