dmvk commented on a change in pull request #18306:
URL: https://github.com/apache/flink/pull/18306#discussion_r784971098
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
##########
@@ -58,4 +63,24 @@ public String toString() {
+ localStateDirectories
+ '}';
}
+
+ /**
+ * An {@link NullOfDirProviderException} is thrown when the {@link
+ * LocalRecoveryDirectoryProvider} is null.
+ */
+ @Internal
+ public static class NullOfDirProviderException extends RuntimeException {
+ public NullOfDirProviderException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public NullOfDirProviderException(String message) {
+ super(message);
+ }
+
+ public NullOfDirProviderException() {
+ this(
+ "When task local recovery is enabled, the
NullPointerException of LocalRecoveryDirectoryProvider should not happen.");
+ }
+ }
Review comment:
```suggestion
public static Supplier<IllegalStateException> localRecoveryNotEnabled() {
return () ->
new IllegalStateException(
"Getting a LocalRecoveryDirectoryProvider is only
supported with the local recovery enabled. This is a bug and should be
reported.");
}
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
##########
@@ -44,9 +48,10 @@ public boolean isLocalRecoveryEnabled() {
return localRecoveryEnabled;
}
- @Nonnull
- public LocalRecoveryDirectoryProvider getLocalStateDirectoryProvider() {
- return localStateDirectories;
+ public Optional<LocalRecoveryDirectoryProvider>
getLocalStateDirectoryProvider() {
+ return localStateDirectories == null
+ ? Optional.empty()
+ : Optional.of(localStateDirectories);
Review comment:
```suggestion
return Optional.ofNullable(localStateDirectories);
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
##########
@@ -44,9 +48,10 @@ public boolean isLocalRecoveryEnabled() {
return localRecoveryEnabled;
Review comment:
```suggestion
return localStateDirectories != null;
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
##########
@@ -31,11 +35,11 @@
private final boolean localRecoveryEnabled;
/** Encapsulates the root directories and the subtask-specific path. */
- @Nonnull private final LocalRecoveryDirectoryProvider
localStateDirectories;
+ @Nullable private final LocalRecoveryDirectoryProvider
localStateDirectories;
public LocalRecoveryConfig(
boolean localRecoveryEnabled,
Review comment:
I think we might want to get rid of this parameter completely. This two
parameters having the same meaning thing, just leads into some weird
implementations.
For example we can now easily get rid of the
`SavepointLocalRecoveryProvider` from the state processor API.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]