autophagy commented on a change in pull request #18913: URL: https://github.com/apache/flink/pull/18913#discussion_r815739863
########## File path: flink-core/src/main/java/org/apache/flink/configuration/CleanupOptions.java ########## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.docs.ConfigGroup; +import org.apache.flink.annotation.docs.ConfigGroups; +import org.apache.flink.configuration.description.Description; +import org.apache.flink.configuration.description.TextElement; + +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet; + +import java.time.Duration; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.configuration.description.TextElement.code; +import static org.apache.flink.configuration.description.TextElement.text; + +/** + * {@link ConfigOption} collection for the configuring repeatable cleanup of resource cleanup after + * a job reached a globally-terminated state. + * + * <p>This implementation copies {@link RestartStrategyOptions} to provide similar user experience. + * FLINK-26359 is created to clean this up. + */ +@PublicEvolving +@ConfigGroups( + groups = { + @ConfigGroup( + name = "ExponentialDelayCleanupStrategy", + keyPrefix = "cleanup-strategy.exponential-delay"), + @ConfigGroup( + name = "FixedDelayCleanupStrategy", + keyPrefix = "cleanup-strategy.fixed-delay"), + }) +public class CleanupOptions { + + private static final String CLEANUP_STRATEGY_PARAM = "cleanup-strategy"; + + public static final String FIXED_DELAY_LABEL = "fixed-delay"; + public static final String EXPONENTIAL_DELAY_LABEL = "exponential-delay"; + + public static final Set<String> NONE_PARAM_VALUES = ImmutableSet.of("none", "disable", "off"); + + private static String createParameterPrefix(String paramGroupKey) { + return CLEANUP_STRATEGY_PARAM + "." + paramGroupKey + "."; + } + + private static String createFixedDelayParameterPrefix(String parameter) { + return createParameterPrefix(FIXED_DELAY_LABEL) + parameter; + } + + private static String createExponentialBackoffParameterPrefix(String parameter) { + return createParameterPrefix(EXPONENTIAL_DELAY_LABEL) + parameter; + } + + public static String extractAlphaNumericCharacters(String paramName) { + return paramName.replaceAll("[^a-zA-Z0-9]", ""); + } + + public static final ConfigOption<String> CLEANUP_STRATEGY = + ConfigOptions.key(CLEANUP_STRATEGY_PARAM) + .stringType() + .defaultValue(EXPONENTIAL_DELAY_LABEL) + .withDescription( + Description.builder() + .text( + "Defines the cleanup strategy to use in case of cleanup failures.") + .linebreak() + .text("Accepted values are:") + .list( + text( + NONE_PARAM_VALUES.stream() + .map(ignored -> "%s") + .collect( + Collectors.joining( + ", ")) + + ": Cleanup is only performed once. No retry " + + "will be initiated in case of failure.", + NONE_PARAM_VALUES.stream() + .map(TextElement::code) + .collect(Collectors.toList()) + .toArray( + new TextElement + [NONE_PARAM_VALUES + .size()])), + text( + "%s, %s: Cleanups will run in a static interval up " Review comment: I think maybe something like "Cleanup attempts will be separated by a static interval/delay" would be a little clearer here, since "run in a static interval" sounds a little ambigious as to what its actually setting ########## File path: flink-core/src/main/java/org/apache/flink/configuration/CleanupOptions.java ########## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.docs.ConfigGroup; +import org.apache.flink.annotation.docs.ConfigGroups; +import org.apache.flink.configuration.description.Description; +import org.apache.flink.configuration.description.TextElement; + +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet; + +import java.time.Duration; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.configuration.description.TextElement.code; +import static org.apache.flink.configuration.description.TextElement.text; + +/** + * {@link ConfigOption} collection for the configuring repeatable cleanup of resource cleanup after Review comment: ```suggestion * {@link ConfigOption} collection for the configuration of repeatable cleanup of resources after ``` ########## File path: flink-core/src/main/java/org/apache/flink/configuration/CleanupOptions.java ########## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.docs.ConfigGroup; +import org.apache.flink.annotation.docs.ConfigGroups; +import org.apache.flink.configuration.description.Description; +import org.apache.flink.configuration.description.TextElement; + +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet; + +import java.time.Duration; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.configuration.description.TextElement.code; +import static org.apache.flink.configuration.description.TextElement.text; + +/** + * {@link ConfigOption} collection for the configuring repeatable cleanup of resource cleanup after + * a job reached a globally-terminated state. + * + * <p>This implementation copies {@link RestartStrategyOptions} to provide similar user experience. + * FLINK-26359 is created to clean this up. + */ +@PublicEvolving +@ConfigGroups( + groups = { + @ConfigGroup( + name = "ExponentialDelayCleanupStrategy", + keyPrefix = "cleanup-strategy.exponential-delay"), + @ConfigGroup( + name = "FixedDelayCleanupStrategy", + keyPrefix = "cleanup-strategy.fixed-delay"), + }) +public class CleanupOptions { + + private static final String CLEANUP_STRATEGY_PARAM = "cleanup-strategy"; + + public static final String FIXED_DELAY_LABEL = "fixed-delay"; + public static final String EXPONENTIAL_DELAY_LABEL = "exponential-delay"; + + public static final Set<String> NONE_PARAM_VALUES = ImmutableSet.of("none", "disable", "off"); + + private static String createParameterPrefix(String paramGroupKey) { + return CLEANUP_STRATEGY_PARAM + "." + paramGroupKey + "."; + } + + private static String createFixedDelayParameterPrefix(String parameter) { + return createParameterPrefix(FIXED_DELAY_LABEL) + parameter; + } + + private static String createExponentialBackoffParameterPrefix(String parameter) { + return createParameterPrefix(EXPONENTIAL_DELAY_LABEL) + parameter; + } + + public static String extractAlphaNumericCharacters(String paramName) { + return paramName.replaceAll("[^a-zA-Z0-9]", ""); + } + + public static final ConfigOption<String> CLEANUP_STRATEGY = + ConfigOptions.key(CLEANUP_STRATEGY_PARAM) + .stringType() + .defaultValue(EXPONENTIAL_DELAY_LABEL) + .withDescription( + Description.builder() + .text( + "Defines the cleanup strategy to use in case of cleanup failures.") + .linebreak() + .text("Accepted values are:") + .list( + text( + NONE_PARAM_VALUES.stream() + .map(ignored -> "%s") + .collect( + Collectors.joining( + ", ")) + + ": Cleanup is only performed once. No retry " + + "will be initiated in case of failure.", + NONE_PARAM_VALUES.stream() + .map(TextElement::code) + .collect(Collectors.toList()) + .toArray( + new TextElement + [NONE_PARAM_VALUES + .size()])), + text( + "%s, %s: Cleanups will run in a static interval up " + + "to the point where the cleanup is considered " + + "successful or a set amount of retries is reached.", + code(FIXED_DELAY_LABEL), + code( + extractAlphaNumericCharacters( + FIXED_DELAY_LABEL))), + text( + "%s, %s: Exponential delay restart strategy " + + "triggers the cleanup with an exponentially " + + "increasing delay up to the point where the " + + "cleanup succeeded or a set amount of retries " + + "is reached.", + code(EXPONENTIAL_DELAY_LABEL), + code( + extractAlphaNumericCharacters( + EXPONENTIAL_DELAY_LABEL)))) + .text( + "The default configuration relies on an exponentially delayed " + + "retry strategy with the given default values.") + .build()); + + public static final ConfigOption<Integer> CLEANUP_STRATEGY_FIXED_DELAY_ATTEMPTS = + ConfigOptions.key(createFixedDelayParameterPrefix("attempts")) + .intType() + .defaultValue(1) + .withDescription( + Description.builder() + .text( + "The number of times that Flink retries the cleanup " + + "before giving up if %s has been set to %s.", + code(CLEANUP_STRATEGY_PARAM), code(FIXED_DELAY_LABEL)) + .build()); + + public static final ConfigOption<Duration> CLEANUP_STRATEGY_FIXED_DELAY_DELAY = + ConfigOptions.key(createFixedDelayParameterPrefix("delay")) + .durationType() + .defaultValue(Duration.ofSeconds(1)) + .withDescription( + Description.builder() + .text( + "Amount of time, Flink waits before re-triggering the " Review comment: ```suggestion "Amount of time that Flink waits before re-triggering the " ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CleanupRetryStrategyFactory.java ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.configuration.CleanupOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.concurrent.ExponentialBackoffRetryStrategy; +import org.apache.flink.util.concurrent.FixedRetryStrategy; +import org.apache.flink.util.concurrent.RetryStrategy; + +import java.time.Duration; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * {@code CleanupRetryStrategyFactory} is used to create the {@link RetryStrategy} for the {@link + * DispatcherResourceCleanerFactory}. It extracts all the necessary information from the passed + * {@link Configuration}. + * + * @see CleanupOptions + */ +public enum CleanupRetryStrategyFactory { + INSTANCE; Review comment: This is more of a question than a suggestion, but just so that I understand this more: is this a pattern to use an enum with a single pattern to create a factory, rather than a class? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
