zentol commented on a change in pull request #19121:
URL: https://github.com/apache/flink/pull/19121#discussion_r829068119
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -496,12 +498,14 @@ public boolean releaseAndTryRemove(String key) throws
Exception {
Objects.requireNonNull(configMap.getData().remove(key));
}
return Optional.of(configMap);
+ } else {
+ stateHandleDoesNotExist.set(true);
}
return Optional.empty();
})
.thenCompose(
updated -> {
- if (updated && stateHandleRefer.get() != null) {
+ if (stateHandleRefer.get() != null) {
Review comment:
Why are we now assuming that update == true if the ref isn't null? Even
if it's currently the case, that seems to rely on implementation details of the
KubeClient.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -464,13 +465,14 @@ public StringResourceVersion exists(String key) throws
Exception {
* It returns the {@link RetrievableStateHandle} stored under the given
state node if any.
*
* @param key Key to be removed from ConfigMap
- * @return True if the state handle is removed successfully
+ * @return True if the state handle isn't listed anymore.
* @throws Exception if removing the key or discarding the state failed
*/
@Override
public boolean releaseAndTryRemove(String key) throws Exception {
checkNotNull(key, "Key in ConfigMap.");
- final AtomicReference<RetrievableStateHandle<T>> stateHandleRefer =
new AtomicReference<>();
+ final AtomicReference<StateObject> stateHandleRefer = new
AtomicReference<>();
+ final AtomicBoolean stateHandleDoesNotExist = new AtomicBoolean(false);
Review comment:
:/
it's not really nice that we need this secondary channel to get results from
the update function.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java
##########
@@ -220,14 +221,23 @@ public void testGlobalCleanupWithNonExistName() throws
Exception {
.globalCleanupAsync(testingJobGraph.getJobID(),
Executors.directExecutor())
.join();
- try {
- removeFuture.get(timeout, TimeUnit.MILLISECONDS);
- fail(
- "We should get an expected timeout because we are removing
a non-existed job graph.");
- } catch (TimeoutException ex) {
- // expected
- }
- assertThat(removeFuture.isDone(), is(false));
+ removeFuture.join();
+ assertThat(removeFuture.isDone(), is(true));
Review comment:
there's no way for this to fail.
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
##########
@@ -803,6 +803,34 @@ public void testRemove() throws Exception {
};
}
+ @Test
+ public void testRemoveOfNonExistingState() throws Exception {
+ new Context() {
+ {
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final KubernetesStateHandleStore<
+
TestingLongStateHandleHelper.LongStateHandle>
+ store =
+ new KubernetesStateHandleStore<>(
+ flinkKubeClient,
+ LEADER_CONFIGMAP_NAME,
+ longStateStorage,
+ filter,
+ LOCK_IDENTITY);
+ assertThat(store.getAllAndLock().size(), is(0));
+ assertThat(store.releaseAndTryRemove(key),
is(true));
+ assertThat(store.getAllAndLock().size(), is(0));
+
+ // State should also be discarded.
Review comment:
shouldn't the count then be 1?...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]