XComp commented on code in PR #23531:
URL: https://github.com/apache/flink/pull/23531#discussion_r1374561765
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStoreOptions.java:
##########
@@ -65,4 +65,11 @@ public class JobResultStoreOptions {
+ "are, instead, marked as clean to
indicate their state. In this "
+ "case, Flink no longer has ownership and
the resources need to "
+ "be cleaned up by the user.");
+
+
@Documentation.Section(Documentation.Sections.COMMON_HIGH_AVAILABILITY_JOB_RESULT_STORE)
+ public static final ConfigOption<Integer> TIME_TO_REMOVE_CLEAN_JOB_RESULT =
+ ConfigOptions.key("job-result-store.ttl-clean-job-result")
+ .intType()
+ .defaultValue(10)
Review Comment:
Didn't we agree on infinity as the default value here? :thinking:
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStoreOptions.java:
##########
@@ -65,4 +65,11 @@ public class JobResultStoreOptions {
+ "are, instead, marked as clean to
indicate their state. In this "
+ "case, Flink no longer has ownership and
the resources need to "
+ "be cleaned up by the user.");
+
+
@Documentation.Section(Documentation.Sections.COMMON_HIGH_AVAILABILITY_JOB_RESULT_STORE)
+ public static final ConfigOption<Integer> TIME_TO_REMOVE_CLEAN_JOB_RESULT =
+ ConfigOptions.key("job-result-store.ttl-clean-job-result")
+ .intType()
Review Comment:
```suggestion
.durationType()
```
There is a `durationType()` for config parameters. That allows the user to
specify any time unit.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStoreOptions.java:
##########
@@ -65,4 +65,11 @@ public class JobResultStoreOptions {
+ "are, instead, marked as clean to
indicate their state. In this "
+ "case, Flink no longer has ownership and
the resources need to "
+ "be cleaned up by the user.");
+
+
@Documentation.Section(Documentation.Sections.COMMON_HIGH_AVAILABILITY_JOB_RESULT_STORE)
+ public static final ConfigOption<Integer> TIME_TO_REMOVE_CLEAN_JOB_RESULT =
+ ConfigOptions.key("job-result-store.ttl-clean-job-result")
Review Comment:
The parameter name doesn't specify the embedded job result store as target
component. Users who use the file-based component might be surprised that this
option doesn't have any effect.
But this keeps me wondering: Shouldn't we also offer this functionality to
the file-based `JobResultStore`? WDYT? This allows for automatic removal of
files after a certain amount of time. Sticking to the default value being set
to `infinity` wouldn't change the current behavior.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStoreOptions.java:
##########
@@ -65,4 +65,11 @@ public class JobResultStoreOptions {
+ "are, instead, marked as clean to
indicate their state. In this "
+ "case, Flink no longer has ownership and
the resources need to "
+ "be cleaned up by the user.");
+
+
@Documentation.Section(Documentation.Sections.COMMON_HIGH_AVAILABILITY_JOB_RESULT_STORE)
+ public static final ConfigOption<Integer> TIME_TO_REMOVE_CLEAN_JOB_RESULT =
Review Comment:
I'm wondering whether we want to start a dev ML discussion on that one
considering that configuration parameters are considered public API (I guess, a
dedicated FLIP would be overkill here). WDYT?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedJobResultStore.java:
##########
@@ -67,7 +79,7 @@ public boolean hasDirtyJobResultEntryInternal(JobID jobId) {
@Override
public boolean hasCleanJobResultEntryInternal(JobID jobId) {
- return cleanJobResults.containsKey(jobId);
+ return cleanJobResults.asMap().containsKey(jobId);
Review Comment:
```suggestion
return cleanJobResults.getIfPresent(jobId) != null;
```
nit: There is a interface method that appears to be a better fit to the use
case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]