zentol commented on a change in pull request #19191:
URL: https://github.com/apache/flink/pull/19191#discussion_r831381056
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounterTest.java
##########
@@ -50,6 +55,131 @@ public void testGetAndIncrement() throws Exception {
};
}
+ @Test
+ public void testShutdown() throws Exception {
+ new Context() {
+ {
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final KubernetesCheckpointIDCounter
checkpointIDCounter =
+ new KubernetesCheckpointIDCounter(
+ flinkKubeClient,
LEADER_CONFIGMAP_NAME, LOCK_IDENTITY);
+ checkpointIDCounter.start();
+
+ checkpointIDCounter.setCount(100L);
+
+ assertThat(
+ getLeaderConfigMap()
+ .getData()
+
.get(Constants.CHECKPOINT_COUNTER_KEY),
+ is("100"));
+
+
checkpointIDCounter.shutdown(JobStatus.FINISHED).join();
+
+ assertThat(
+ getLeaderConfigMap()
+ .getData()
+
.containsKey(Constants.CHECKPOINT_COUNTER_KEY),
+ is(false));
+ });
+ }
+ };
+ }
+
+ @Test
+ public void testShutdownForLocallyTerminatedJobStatus() throws Exception {
+ new Context() {
+ {
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final KubernetesCheckpointIDCounter
checkpointIDCounter =
+ new KubernetesCheckpointIDCounter(
+ flinkKubeClient,
LEADER_CONFIGMAP_NAME, LOCK_IDENTITY);
+ checkpointIDCounter.start();
+
+ checkpointIDCounter.setCount(100L);
+
+ assertThat(
+ getLeaderConfigMap()
+ .getData()
+
.get(Constants.CHECKPOINT_COUNTER_KEY),
+ is("100"));
+
+
checkpointIDCounter.shutdown(JobStatus.SUSPENDED).join();
+
+ assertThat(
+ getLeaderConfigMap()
+ .getData()
+
.containsKey(Constants.CHECKPOINT_COUNTER_KEY),
+ is(true));
+ });
+ }
+ };
+ }
+
+ @Test
+ public void testIdempotentShutdown() throws Exception {
+ new Context() {
+ {
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final KubernetesCheckpointIDCounter
checkpointIDCounter =
+ new KubernetesCheckpointIDCounter(
+ flinkKubeClient,
LEADER_CONFIGMAP_NAME, LOCK_IDENTITY);
+ checkpointIDCounter.start();
+
+ assertThat(
+ getLeaderConfigMap()
+ .getData()
+
.containsKey(Constants.CHECKPOINT_COUNTER_KEY),
+ is(false));
+
+
checkpointIDCounter.shutdown(JobStatus.FINISHED).join();
+
+ assertThat(
+ getLeaderConfigMap()
+ .getData()
+
.containsKey(Constants.CHECKPOINT_COUNTER_KEY),
+ is(false));
+ });
+ }
+ };
+ }
+
+ @Test
+ public void testShutdownFailureDueToMissingConfigMap() throws Exception {
+ new Context() {
+ {
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final KubernetesCheckpointIDCounter
checkpointIDCounter =
+ new KubernetesCheckpointIDCounter(
+ flinkKubeClient,
LEADER_CONFIGMAP_NAME, LOCK_IDENTITY);
+ checkpointIDCounter.start();
+
+ // deleting the ConfigMap from outside of the
CheckpointIDCounter while
+ // still using the counter (which is stored as an
entry in the
+ // ConfigMap) causes an unexpected failure which
we want to simulate
+ // here
+
flinkKubeClient.deleteConfigMap(LEADER_CONFIGMAP_NAME);
+
+ assertThrows(
+ "The shutdown should fail due to the
illegal state of the test setup.",
Review comment:
this message doesn't really provide any value. That some part of the
test setup is supposed to make this fail is obvious.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounterITCase.java
##########
@@ -62,10 +69,70 @@ public void testShutdownRemovesState() throws Exception {
CuratorFramework client = zookeeper.getClient();
assertThat(client.checkExists().forPath(counter.getPath())).isNotNull();
- counter.shutdown(JobStatus.FINISHED);
+ counter.shutdown(JobStatus.FINISHED).join();
+ assertThat(client.checkExists().forPath(counter.getPath())).isNull();
+ }
+
+ @Test
+ public void testIdempotentShutdown() throws Exception {
+ ZooKeeperCheckpointIDCounter counter = createCheckpointIdCounter();
+ counter.start();
+
+ CuratorFramework client = zookeeper.getClient();
+ counter.shutdown(JobStatus.FINISHED).join();
+
+ // shutdown shouldn't fail due to missing path
+ counter.shutdown(JobStatus.FINISHED).join();
assertThat(client.checkExists().forPath(counter.getPath())).isNull();
}
+ @Test
+ public void testShutdownWithFailureDueToMissingConnection() throws
Exception {
+ ZooKeeperCheckpointIDCounter counter = createCheckpointIdCounter();
+ counter.start();
+
+ cleanAndStopZooKeeperIfRunning();
+
+ assertThat(counter.shutdown(JobStatus.FINISHED))
+ .as("The shutdown should fail because of the client connection
being dropped.")
+ .failsWithin(Duration.ofHours(1));
Review comment:
Not really a fan of using such dummy timeouts. I'd rather call get() on
the future and simplify the whole thing. We shouldn't force ourselves to use
the CompletableFuture assertions if they are incompatible with our testing
philosophy.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
##########
@@ -101,22 +110,66 @@ public void start() throws Exception {
}
@Override
- public void shutdown(JobStatus jobStatus) throws Exception {
+ public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
synchronized (startStopLock) {
if (isStarted) {
LOG.info("Shutting down.");
- sharedCount.close();
+ try {
+ sharedCount.close();
+ } catch (IOException e) {
+ return FutureUtils.completedExceptionally(e);
+ }
client.getConnectionStateListenable().removeListener(connectionStateListener);
if (jobStatus.isGloballyTerminalState()) {
LOG.info("Removing {} from ZooKeeper", counterPath);
-
client.delete().deletingChildrenIfNeeded().inBackground().forPath(counterPath);
+ final CompletableFuture<Void> deletionFuture = new
CompletableFuture<>();
+ try {
+ client.delete()
+ .inBackground(
+ (curatorFramework, curatorEvent) ->
+ handleDeletionOfCounterPath(
+ curatorEvent,
deletionFuture))
+ .forPath(counterPath);
Review comment:
could create and return the deletion future within the try block
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounterTest.java
##########
@@ -50,6 +55,131 @@ public void testGetAndIncrement() throws Exception {
};
}
+ @Test
+ public void testShutdown() throws Exception {
+ new Context() {
+ {
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final KubernetesCheckpointIDCounter
checkpointIDCounter =
+ new KubernetesCheckpointIDCounter(
+ flinkKubeClient,
LEADER_CONFIGMAP_NAME, LOCK_IDENTITY);
+ checkpointIDCounter.start();
+
+ checkpointIDCounter.setCount(100L);
+
+ assertThat(
+ getLeaderConfigMap()
+ .getData()
+
.get(Constants.CHECKPOINT_COUNTER_KEY),
+ is("100"));
+
+
checkpointIDCounter.shutdown(JobStatus.FINISHED).join();
+
+ assertThat(
+ getLeaderConfigMap()
+ .getData()
+
.containsKey(Constants.CHECKPOINT_COUNTER_KEY),
+ is(false));
+ });
+ }
+ };
+ }
+
+ @Test
+ public void testShutdownForLocallyTerminatedJobStatus() throws Exception {
+ new Context() {
+ {
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final KubernetesCheckpointIDCounter
checkpointIDCounter =
+ new KubernetesCheckpointIDCounter(
+ flinkKubeClient,
LEADER_CONFIGMAP_NAME, LOCK_IDENTITY);
+ checkpointIDCounter.start();
+
+ checkpointIDCounter.setCount(100L);
+
+ assertThat(
+ getLeaderConfigMap()
+ .getData()
+
.get(Constants.CHECKPOINT_COUNTER_KEY),
+ is("100"));
+
+
checkpointIDCounter.shutdown(JobStatus.SUSPENDED).join();
+
+ assertThat(
+ getLeaderConfigMap()
+ .getData()
+
.containsKey(Constants.CHECKPOINT_COUNTER_KEY),
+ is(true));
+ });
+ }
+ };
+ }
+
+ @Test
+ public void testIdempotentShutdown() throws Exception {
+ new Context() {
+ {
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final KubernetesCheckpointIDCounter
checkpointIDCounter =
+ new KubernetesCheckpointIDCounter(
+ flinkKubeClient,
LEADER_CONFIGMAP_NAME, LOCK_IDENTITY);
+ checkpointIDCounter.start();
+
+ assertThat(
+ getLeaderConfigMap()
+ .getData()
+
.containsKey(Constants.CHECKPOINT_COUNTER_KEY),
+ is(false));
Review comment:
I'd prefer this to follow the pattern in the ZK test where we start out
with data in the config map, and then initiate the shutdown twice. As is the
shutdown method could fail catastrophically if called twice in a row, rendering
it non-idempotent for our purposes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]