XComp commented on a change in pull request #18901:
URL: https://github.com/apache/flink/pull/18901#discussion_r815924963
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -509,17 +619,88 @@ public String toString() {
return this.getClass().getSimpleName() + "{configMapName='" +
configMapName + "'}";
}
- private RetrievableStateHandle<T> deserializeObject(String content) throws
IOException {
- checkNotNull(content, "Content should not be null.");
+ private boolean isValidOperation(KubernetesConfigMap c) {
+ return lockIdentity == null ||
KubernetesLeaderElector.hasLeadership(c, lockIdentity);
+ }
- final byte[] data = Base64.getDecoder().decode(content);
+ @VisibleForTesting
+ CompletableFuture<Boolean> updateConfigMap(
+ Function<KubernetesConfigMap, Optional<KubernetesConfigMap>>
updateFn) {
+ return kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ configMap -> {
+ if (isValidOperation(configMap)) {
+ return updateFn.apply(configMap);
+ }
+ return Optional.empty();
+ });
+ }
- try {
- return deserialize(data);
- } catch (IOException | ClassNotFoundException e) {
- throw new IOException(
- "Failed to deserialize state handle from ConfigMap data "
+ content + '.', e);
+ /**
+ * Adds entry into the ConfigMap. If the entry already exists and contains
delete marker, the
+ * try finish the removal before the actual update.
+ */
+ private Optional<KubernetesConfigMap> addEntry(
Review comment:
yes, I agree. Especially, the case you mentioned at the end crossed my
mind as well when writing the comment above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]