autophagy commented on a change in pull request #18913:
URL: https://github.com/apache/flink/pull/18913#discussion_r815739863



##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/CleanupOptions.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.ConfigGroup;
+import org.apache.flink.annotation.docs.ConfigGroups;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.configuration.description.TextElement;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
+
+import java.time.Duration;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/**
+ * {@link ConfigOption} collection for the configuring repeatable cleanup of 
resource cleanup after
+ * a job reached a globally-terminated state.
+ *
+ * <p>This implementation copies {@link RestartStrategyOptions} to provide 
similar user experience.
+ * FLINK-26359 is created to clean this up.
+ */
+@PublicEvolving
+@ConfigGroups(
+        groups = {
+            @ConfigGroup(
+                    name = "ExponentialDelayCleanupStrategy",
+                    keyPrefix = "cleanup-strategy.exponential-delay"),
+            @ConfigGroup(
+                    name = "FixedDelayCleanupStrategy",
+                    keyPrefix = "cleanup-strategy.fixed-delay"),
+        })
+public class CleanupOptions {
+
+    private static final String CLEANUP_STRATEGY_PARAM = "cleanup-strategy";
+
+    public static final String FIXED_DELAY_LABEL = "fixed-delay";
+    public static final String EXPONENTIAL_DELAY_LABEL = "exponential-delay";
+
+    public static final Set<String> NONE_PARAM_VALUES = 
ImmutableSet.of("none", "disable", "off");
+
+    private static String createParameterPrefix(String paramGroupKey) {
+        return CLEANUP_STRATEGY_PARAM + "." + paramGroupKey + ".";
+    }
+
+    private static String createFixedDelayParameterPrefix(String parameter) {
+        return createParameterPrefix(FIXED_DELAY_LABEL) + parameter;
+    }
+
+    private static String createExponentialBackoffParameterPrefix(String 
parameter) {
+        return createParameterPrefix(EXPONENTIAL_DELAY_LABEL) + parameter;
+    }
+
+    public static String extractAlphaNumericCharacters(String paramName) {
+        return paramName.replaceAll("[^a-zA-Z0-9]", "");
+    }
+
+    public static final ConfigOption<String> CLEANUP_STRATEGY =
+            ConfigOptions.key(CLEANUP_STRATEGY_PARAM)
+                    .stringType()
+                    .defaultValue(EXPONENTIAL_DELAY_LABEL)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Defines the cleanup strategy to 
use in case of cleanup failures.")
+                                    .linebreak()
+                                    .text("Accepted values are:")
+                                    .list(
+                                            text(
+                                                    NONE_PARAM_VALUES.stream()
+                                                                    
.map(ignored -> "%s")
+                                                                    .collect(
+                                                                            
Collectors.joining(
+                                                                               
     ", "))
+                                                            + ": Cleanup is 
only performed once. No retry "
+                                                            + "will be 
initiated in case of failure.",
+                                                    NONE_PARAM_VALUES.stream()
+                                                            
.map(TextElement::code)
+                                                            
.collect(Collectors.toList())
+                                                            .toArray(
+                                                                    new 
TextElement
+                                                                            
[NONE_PARAM_VALUES
+                                                                               
     .size()])),
+                                            text(
+                                                    "%s, %s: Cleanups will run 
in a static interval up "

Review comment:
       I think maybe something like "Cleanup attempts will be separated by a 
static interval/delay" would be a little clearer here, since "run in a static 
interval" sounds a little ambigious as to what its actually setting

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/CleanupOptions.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.ConfigGroup;
+import org.apache.flink.annotation.docs.ConfigGroups;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.configuration.description.TextElement;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
+
+import java.time.Duration;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/**
+ * {@link ConfigOption} collection for the configuring repeatable cleanup of 
resource cleanup after

Review comment:
       ```suggestion
    * {@link ConfigOption} collection for the configuration of repeatable 
cleanup of resources after
   ```

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/CleanupOptions.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.ConfigGroup;
+import org.apache.flink.annotation.docs.ConfigGroups;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.configuration.description.TextElement;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
+
+import java.time.Duration;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/**
+ * {@link ConfigOption} collection for the configuring repeatable cleanup of 
resource cleanup after
+ * a job reached a globally-terminated state.
+ *
+ * <p>This implementation copies {@link RestartStrategyOptions} to provide 
similar user experience.
+ * FLINK-26359 is created to clean this up.
+ */
+@PublicEvolving
+@ConfigGroups(
+        groups = {
+            @ConfigGroup(
+                    name = "ExponentialDelayCleanupStrategy",
+                    keyPrefix = "cleanup-strategy.exponential-delay"),
+            @ConfigGroup(
+                    name = "FixedDelayCleanupStrategy",
+                    keyPrefix = "cleanup-strategy.fixed-delay"),
+        })
+public class CleanupOptions {
+
+    private static final String CLEANUP_STRATEGY_PARAM = "cleanup-strategy";
+
+    public static final String FIXED_DELAY_LABEL = "fixed-delay";
+    public static final String EXPONENTIAL_DELAY_LABEL = "exponential-delay";
+
+    public static final Set<String> NONE_PARAM_VALUES = 
ImmutableSet.of("none", "disable", "off");
+
+    private static String createParameterPrefix(String paramGroupKey) {
+        return CLEANUP_STRATEGY_PARAM + "." + paramGroupKey + ".";
+    }
+
+    private static String createFixedDelayParameterPrefix(String parameter) {
+        return createParameterPrefix(FIXED_DELAY_LABEL) + parameter;
+    }
+
+    private static String createExponentialBackoffParameterPrefix(String 
parameter) {
+        return createParameterPrefix(EXPONENTIAL_DELAY_LABEL) + parameter;
+    }
+
+    public static String extractAlphaNumericCharacters(String paramName) {
+        return paramName.replaceAll("[^a-zA-Z0-9]", "");
+    }
+
+    public static final ConfigOption<String> CLEANUP_STRATEGY =
+            ConfigOptions.key(CLEANUP_STRATEGY_PARAM)
+                    .stringType()
+                    .defaultValue(EXPONENTIAL_DELAY_LABEL)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Defines the cleanup strategy to 
use in case of cleanup failures.")
+                                    .linebreak()
+                                    .text("Accepted values are:")
+                                    .list(
+                                            text(
+                                                    NONE_PARAM_VALUES.stream()
+                                                                    
.map(ignored -> "%s")
+                                                                    .collect(
+                                                                            
Collectors.joining(
+                                                                               
     ", "))
+                                                            + ": Cleanup is 
only performed once. No retry "
+                                                            + "will be 
initiated in case of failure.",
+                                                    NONE_PARAM_VALUES.stream()
+                                                            
.map(TextElement::code)
+                                                            
.collect(Collectors.toList())
+                                                            .toArray(
+                                                                    new 
TextElement
+                                                                            
[NONE_PARAM_VALUES
+                                                                               
     .size()])),
+                                            text(
+                                                    "%s, %s: Cleanups will run 
in a static interval up "
+                                                            + "to the point 
where the cleanup is considered "
+                                                            + "successful or a 
set amount of retries is reached.",
+                                                    code(FIXED_DELAY_LABEL),
+                                                    code(
+                                                            
extractAlphaNumericCharacters(
+                                                                    
FIXED_DELAY_LABEL))),
+                                            text(
+                                                    "%s, %s: Exponential delay 
restart strategy "
+                                                            + "triggers the 
cleanup with an exponentially "
+                                                            + "increasing 
delay up to the point where the "
+                                                            + "cleanup 
succeeded or a set amount of retries "
+                                                            + "is reached.",
+                                                    
code(EXPONENTIAL_DELAY_LABEL),
+                                                    code(
+                                                            
extractAlphaNumericCharacters(
+                                                                    
EXPONENTIAL_DELAY_LABEL))))
+                                    .text(
+                                            "The default configuration relies 
on an exponentially delayed "
+                                                    + "retry strategy with the 
given default values.")
+                                    .build());
+
+    public static final ConfigOption<Integer> 
CLEANUP_STRATEGY_FIXED_DELAY_ATTEMPTS =
+            ConfigOptions.key(createFixedDelayParameterPrefix("attempts"))
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The number of times that Flink 
retries the cleanup "
+                                                    + "before giving up if %s 
has been set to %s.",
+                                            code(CLEANUP_STRATEGY_PARAM), 
code(FIXED_DELAY_LABEL))
+                                    .build());
+
+    public static final ConfigOption<Duration> 
CLEANUP_STRATEGY_FIXED_DELAY_DELAY =
+            ConfigOptions.key(createFixedDelayParameterPrefix("delay"))
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(1))
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Amount of time, Flink waits 
before re-triggering the "

Review comment:
       ```suggestion
                                               "Amount of time that Flink waits 
before re-triggering the "
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CleanupRetryStrategyFactory.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.configuration.CleanupOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.concurrent.ExponentialBackoffRetryStrategy;
+import org.apache.flink.util.concurrent.FixedRetryStrategy;
+import org.apache.flink.util.concurrent.RetryStrategy;
+
+import java.time.Duration;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * {@code CleanupRetryStrategyFactory} is used to create the {@link 
RetryStrategy} for the {@link
+ * DispatcherResourceCleanerFactory}. It extracts all the necessary 
information from the passed
+ * {@link Configuration}.
+ *
+ * @see CleanupOptions
+ */
+public enum CleanupRetryStrategyFactory {
+    INSTANCE;

Review comment:
       This is more of a question than a suggestion, but just so that I 
understand this more: is this a pattern to use an enum with a single pattern to 
create a factory, rather than a class?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to