dmvk commented on a change in pull request #18306:
URL: https://github.com/apache/flink/pull/18306#discussion_r784971098



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
##########
@@ -58,4 +63,24 @@ public String toString() {
                 + localStateDirectories
                 + '}';
     }
+
+    /**
+     * An {@link NullOfDirProviderException} is thrown when the {@link
+     * LocalRecoveryDirectoryProvider} is null.
+     */
+    @Internal
+    public static class NullOfDirProviderException extends RuntimeException {
+        public NullOfDirProviderException(String message, Throwable cause) {
+            super(message, cause);
+        }
+
+        public NullOfDirProviderException(String message) {
+            super(message);
+        }
+
+        public NullOfDirProviderException() {
+            this(
+                    "When task local recovery is enabled, the 
NullPointerException of LocalRecoveryDirectoryProvider should not happen.");
+        }
+    }

Review comment:
       ```suggestion
   
       public static Supplier<IllegalStateException> localRecoveryNotEnabled() {
           return () ->
                   new IllegalStateException(
                           "Getting a LocalRecoveryDirectoryProvider is only 
supported with the local recovery enabled. This is a bug and should be 
reported.");
       }
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
##########
@@ -44,9 +48,10 @@ public boolean isLocalRecoveryEnabled() {
         return localRecoveryEnabled;
     }
 
-    @Nonnull
-    public LocalRecoveryDirectoryProvider getLocalStateDirectoryProvider() {
-        return localStateDirectories;
+    public Optional<LocalRecoveryDirectoryProvider> 
getLocalStateDirectoryProvider() {
+        return localStateDirectories == null
+                ? Optional.empty()
+                : Optional.of(localStateDirectories);

Review comment:
       ```suggestion
           return Optional.ofNullable(localStateDirectories);
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
##########
@@ -44,9 +48,10 @@ public boolean isLocalRecoveryEnabled() {
         return localRecoveryEnabled;

Review comment:
       ```suggestion
           return localStateDirectories != null;
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
##########
@@ -31,11 +35,11 @@
     private final boolean localRecoveryEnabled;
 
     /** Encapsulates the root directories and the subtask-specific path. */
-    @Nonnull private final LocalRecoveryDirectoryProvider 
localStateDirectories;
+    @Nullable private final LocalRecoveryDirectoryProvider 
localStateDirectories;
 
     public LocalRecoveryConfig(
             boolean localRecoveryEnabled,

Review comment:
       I think we might want to get rid of this parameter completely. This two 
parameters having the same meaning thing, just leads into some weird 
implementations.
   
   For example we can now easily get rid of the 
`SavepointLocalRecoveryProvider` from the state processor API.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to