Guosmilesmile commented on code in PR #14243:
URL: https://github.com/apache/iceberg/pull/14243#discussion_r2435079281


##########
docs/docs/flink-maintenance.md:
##########
@@ -378,6 +378,8 @@ These keys are used in SQL (SET or table WITH options) and 
are applicable when w
 | `flink-maintenance.lock.zookeeper.connection-timeout-ms` | Connection 
timeout (ms) | `15000` |
 | `flink-maintenance.lock.zookeeper.max-retries` | Max retries | `3` |
 | `flink-maintenance.lock.zookeeper.base-sleep-ms` | Base sleep between 
retries (ms) | `3000` |
+| `flink-maintenance.lock.zookeeper.max-sleep-ms`          | Maximum sleep 
time (in milliseconds) between retries. Caps the exponential backoff delay.  | 
`10000` |

Review Comment:
   Maximum sleep time (ms) between retries.



##########
flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestZkLockFactory.java:
##########
@@ -51,4 +62,37 @@ public void after() throws IOException {
       zkTestServer.close();
     }
   }
+
+  @Test
+  void testRetryPolicyClassMapping() {
+    for (ZKRetryPolicies policy : ZKRetryPolicies.values()) {
+      ZkLockFactory factory =
+          new ZkLockFactory("localhost:2181", "test", 3000, 3000, 1000, 3, 
2000, policy.name());
+
+      Object retryPolicy = factory.createRetryPolicy();
+      assertThat(retryPolicy).isNotNull();
+
+      String className = retryPolicy.getClass().getSimpleName();
+
+      switch (policy) {
+        case ONE_TIME:
+          assertThat(className).isEqualTo("RetryOneTime");
+          break;
+        case N_TIME:
+          assertThat(className).isEqualTo("RetryNTimes");
+          break;
+        case BOUNDED_EXPONENTIAL_BACKOFF:
+          assertThat(className).isEqualTo("BoundedExponentialBackoffRetry");
+          break;
+        case UNTIL_ELAPSED:
+          assertThat(className).isEqualTo("RetryUntilElapsed");
+          break;
+        case EXPONENTIAL_BACKOFF:
+          assertThat(className).isEqualTo("ExponentialBackoffRetry");
+          break;
+        default:
+          throw new IllegalStateException("Unhandled policy type: " + policy);

Review Comment:
   If we get a error policy name,  we will use ExponentialBackoffRetry in 
default?



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ZkLockFactory.java:
##########
@@ -222,4 +240,32 @@ public void unlock() {
       }
     }
   }
+
+  RetryPolicy createRetryPolicy() {
+
+    ZKRetryPolicies policy;
+    try {
+      policy = ZKRetryPolicies.valueOf(retryPolicyName);
+    } catch (IllegalArgumentException e) {
+      policy = ZKRetryPolicies.EXPONENTIAL_BACKOFF;
+    }
+
+    switch (policy) {
+      case ONE_TIME:
+        return new RetryOneTime(baseSleepTimeMs);
+
+      case N_TIME:
+        return new RetryNTimes(maxRetries, baseSleepTimeMs);
+
+      case BOUNDED_EXPONENTIAL_BACKOFF:
+        return new BoundedExponentialBackoffRetry(baseSleepTimeMs, 
maxSleepTimeMs, maxRetries);

Review Comment:
   Thanks , make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to