XComp commented on code in PR #20590:
URL: https://github.com/apache/flink/pull/20590#discussion_r951437549


##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java:
##########
@@ -660,6 +661,12 @@ private Optional<KubernetesConfigMap> addEntry(
                                 "Unable to remove the marked as deleting 
entry.");
                     }
                 } else {
+                    // It could happen the fabric8 k8s client retries a 
transaction that has

Review Comment:
   ```suggestion
                       // It could happen that the k8s client retries a 
transaction that has
   ```
   nit: no need to mention the type of k8s client here to avoid the risk of 
having out-dated comments if we decide to change the internally used client. 
`fabric8` also doesn't add any value here



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java:
##########
@@ -1143,4 +1190,31 @@ private TestingLongStateHandleHelper.LongStateHandle 
addDeletingEntry(
         configMap.getData().put(key, deleting);
         return state;
     }
+
+    private CompletableFuture<Boolean> retryWithFirstFailedK8sOperation(

Review Comment:
   ```suggestion
       private static CompletableFuture<Boolean> 
retryWithFirstFailedK8sOperation(
   ```
   We could even make this a static utility method. We're not using any state 
in there...



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java:
##########
@@ -1143,4 +1190,31 @@ private TestingLongStateHandleHelper.LongStateHandle 
addDeletingEntry(
         configMap.getData().put(key, deleting);
         return state;
     }
+
+    private CompletableFuture<Boolean> retryWithFirstFailedK8sOperation(
+            Function<KubernetesConfigMap, Optional<KubernetesConfigMap>> 
function,
+            KubernetesConfigMap leaderConfigMap) {
+        final AtomicInteger retryNum = new AtomicInteger(0);

Review Comment:
   ```suggestion
           final AtomicInteger callbackInvocationCount = new AtomicInteger(0);
   ```
   nit: IMHO, the naming is a bit misleading here: We're not counting the 
retries (which would be `1` in the end) but the number of times the passed 
`function` is called.



##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java:
##########
@@ -649,6 +649,7 @@ private Optional<KubernetesConfigMap> addEntry(
             KubernetesConfigMap configMap, String key, byte[] 
serializedStateHandle)
             throws Exception {
         final String content = configMap.getData().get(key);
+        final String newContent = toBase64(serializedStateHandle);

Review Comment:
   ```suggestion
           final String oldBase64Content = configMap.getData().get(key);
           final String newBase64Content = toBase64(serializedStateHandle);
   ```
   nit: We might want to make the previously used variable also more explicit 
to differentiate both purposes.



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java:
##########
@@ -1143,4 +1190,31 @@ private TestingLongStateHandleHelper.LongStateHandle 
addDeletingEntry(
         configMap.getData().put(key, deleting);
         return state;
     }
+
+    private CompletableFuture<Boolean> retryWithFirstFailedK8sOperation(
+            Function<KubernetesConfigMap, Optional<KubernetesConfigMap>> 
function,
+            KubernetesConfigMap leaderConfigMap) {
+        final AtomicInteger retryNum = new AtomicInteger(0);
+        final CompletableFuture<Boolean> result =
+                FutureUtils.retry(
+                        () ->
+                                CompletableFuture.supplyAsync(
+                                        () -> {
+                                            retryNum.incrementAndGet();
+                                            function.apply(leaderConfigMap);
+                                            if (retryNum.get() == 1) {
+                                                throw new 
KubernetesClientException(
+                                                        "unstable k8s client 
operation");

Review Comment:
   ```suggestion
                                                           "Expected exception 
to simulate unstable k8s client operation");
   ```
   Can we make the exception method more explicit. This improves the 
readability of the logs produced during the test execution (i.e. it makes it 
easier to differentiate actual problematic Exceptions vs the once that are 
introduced as part of the certain tests).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to