xintongsong commented on a change in pull request #11854: URL: https://github.com/apache/flink/pull/11854#discussion_r412724015
########## File path: flink-core/src/main/java/org/apache/flink/api/common/externalresource/ExternalResourceInfo.java ########## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.externalresource; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Contains the information of an external resource. Exposed through {@link org.apache.flink.api.common.functions.RuntimeContext}. Review comment: ```suggestion * Contains the information of an external resource. ``` I think how the class is exposed should be irrelevant to the class definition. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceDriver.java ########## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +import org.apache.flink.api.common.externalresource.ExternalResourceInfo; + +import java.util.Set; + +/** + * Driver which takes the responsibility to manage and provide the information of external resource. + */ +public interface ExternalResourceDriver { Review comment: I think this is part of the public APIs. It should probably be moved to `flink-core` and annotated `@PublicEnvolving`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceDriverFactory.java ########## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +import org.apache.flink.configuration.Configuration; + +/** + * Factory for {@link ExternalResourceDriver}. Instantiate a driver with configuration; + */ +public interface ExternalResourceDriverFactory { Review comment: Same here. I think this is part of the public APIs. It should probably be moved to `flink-core` and annotated `@PublicEnvolving`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceDriverFactory.java ########## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +import org.apache.flink.configuration.Configuration; + +/** + * Factory for {@link ExternalResourceDriver}. Instantiate a driver with configuration; Review comment: ```suggestion * Factory for {@link ExternalResourceDriver}. Instantiate a driver with configuration. ``` ########## File path: flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * Configuration options for external resources and external resource drivers. + */ +@PublicEvolving +public class ExternalResourceOptions { + + public static final ConfigOption<String> EXTERNAL_RESOURCE_LIST = Review comment: It might be better to use `CongfigOption<List<String>>` for this option. However, that means also changing the delimiter from `,` to `;`. See `YarnConfigOptions#SHIP_DIRECTORIES` for example. ########## File path: flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * Configuration options for external resources and external resource drivers. + */ +@PublicEvolving +public class ExternalResourceOptions { + + public static final ConfigOption<String> EXTERNAL_RESOURCE_LIST = + key("external-resources") + .stringType() + .noDefaultValue() + .withDescription("An optional list of used external resources with delimiter \",\". If configured, Flink " + + "would check if the relevant configs exist for that resource. At the ResourceManager side, Flink will forward " + + "the request to the underlying external resource manager. At the TaskExecutor side, Flink will launch the " + + "corresponding ExternalResourceDriver."); + + public static final ConfigOption<String> EXTERNAL_RESOURCE_DRIVER_FACTORY_CLASS = + key("external-resource.<resourceName>.driver-factory.class") Review comment: I think we'd better use `<abc_xyz>` rather than `<abcXyz>` for the variables in config keys. ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ########## @@ -495,6 +502,32 @@ public void onStopContainerError(ContainerId containerId, Throwable throwable) { // Utility methods // ------------------------------------------------------------------------ + private void setExternalResourceRequestIfPossible(Map<String, Long> externalResources) throws ResourceManagerException { + for (Map.Entry<String, Long> externalResource: externalResources.entrySet()) { + if (externalResource.getValue() == 0) { + continue; + } + + if (!isYarnResourceTypesAvailable()) { + throw new ResourceManagerException("Could not request extended resource because the underlying YARN does not support it."); + } + + try { + final Class<?> resourceInfoClass = Class.forName(RESOURCE_INFO_CLASS); + final Method setResourceInfoMethod = Resource.class.getMethod("setResourceInformation", String.class, resourceInfoClass); + final Method resourceInfoNewInstance = resourceInfoClass.getMethod("newInstance", String.class, long.class); Review comment: I think these codes should be outside the loop. No need to create the `Class` and `Method` for each external resources. Only the invocation of methods need to stay inside the loop. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExternalResourceOptions; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * Utility class for external resource framework. + */ +public class ExternalResourceUtils { Review comment: Let's add a private constructor to make this class non-instanceable. ########## File path: flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * Configuration options for external resources and external resource drivers. + */ +@PublicEvolving +public class ExternalResourceOptions { + + public static final ConfigOption<String> EXTERNAL_RESOURCE_LIST = + key("external-resources") + .stringType() + .noDefaultValue() + .withDescription("An optional list of used external resources with delimiter \",\". If configured, Flink " + + "would check if the relevant configs exist for that resource. At the ResourceManager side, Flink will forward " + + "the request to the underlying external resource manager. At the TaskExecutor side, Flink will launch the " + + "corresponding ExternalResourceDriver."); + + public static final ConfigOption<String> EXTERNAL_RESOURCE_DRIVER_FACTORY_CLASS = + key("external-resource.<resourceName>.driver-factory.class") + .stringType() + .noDefaultValue() + .withDescription("The ExternalResourceDriverFactory class for a specific external resource. At the TaskExecutor side, " + + "ExternalResourceDriver will be instantiated by this class with configuration. You must copy the relevant jar into " + + "the `/plugins/<resourceName>` folder of your Flink distribution."); + + public static final ConfigOption<Long> EXTERNAL_RESOURCE_AMOUNT = + key("external-resource.<resourceName>.amount") + .longType() + .defaultValue(0L) + .withDescription("The amount for a specific external resource per TaskExecutor."); + + public static final ConfigOption<String> EXTERNAL_RESOURCE_YARN_CONFIG_KEY = + key("external-resource.<resourceName>.yarn.config-key") + .stringType() + .noDefaultValue() + .withDescription("Optional config which defines the configuration key of that external resource in Yarn. " + + "If you want the Flink to request the external resource from Yarn, you need to explicitly set this key. " + + "Only valid for Yarn mode."); + + public static final ConfigOption<String> EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY = + key("external-resource.<resourceName>.kubernetes.config-key") + .stringType() + .noDefaultValue() + .withDescription("Optional config which defines the configuration key of that external resource in Kubernetes. " + + "If you want the Flink to request the external resource from Kubernetes, you need to explicitly set this key. " + + "Only valid for Kubernetes mode."); + + public static final ConfigOption<String> REPORTER_CONFIG_PARAMETER = + key("external-resource.{resourceName}.param.{params}") Review comment: ```suggestion key("external-resource.<resourceName>.param.<paramName>") ``` ########## File path: flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * Configuration options for external resources and external resource drivers. + */ +@PublicEvolving +public class ExternalResourceOptions { Review comment: Generally, I think the descriptions of the configuration options should be further polished. The description should focus more on the semantics of the specific option and how should it be used. Terms and concepts mentioned should be explained in detail, e.g., "configuration key in Yarn/Kubernetes", "parameter". No need to provide too many backgrounds about how the external resource framework works ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExternalResourceOptions; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * Utility class for external resource framework. + */ +public class ExternalResourceUtils { + private static final Logger LOG = LoggerFactory.getLogger(ExternalResourceUtils.class); + + // regex pattern to split the defined external resources + private static final Pattern resourceListPattern = Pattern.compile("\\s*,\\s*"); + + /** + * Get the enabled external resource list from configuration. + */ + @VisibleForTesting + static final Set<String> getExternalResourceList(Configuration config) { + return resourceListPattern.splitAsStream(config.getString(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, "")) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); + } + + /** + * Get the config key and value of all of enabled external resources from the underlying external resource manager. + * Each external resource manager(Yarn / Kubernetes) have their own configuration suffix. + */ + public static Map<String, Long> getExternalResourceConfigMap(Configuration config, String suffix) { + final Set<String> resourceSet = getExternalResourceList(config); + LOG.info("Enabled external resources: {}", resourceSet); + + if (resourceSet.isEmpty()) { + return Collections.emptyMap(); + } + + final Map<String, Long> externalResourceConfigs = new HashMap<>(); + for (String resourceName: resourceSet) { + final String configKey = config.getString(ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX + resourceName + suffix, ""); + final long amount = config.getLong(ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX + resourceName + ExternalResourceConstants.EXTERNAL_RESOURCE_AMOUNT_SUFFIX, 0); + if (amount != 0 && !StringUtils.isNullOrWhitespaceOnly(configKey)) { Review comment: Is negative amount allowed? Moreover, we should check not only the sanity of configured values, but also the existence of them. It's probably ok for the `configKey` being absent, but we should throw at least a warning if the `amount` is not configured. ########## File path: flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * Configuration options for external resources and external resource drivers. + */ +@PublicEvolving +public class ExternalResourceOptions { + + public static final ConfigOption<String> EXTERNAL_RESOURCE_LIST = Review comment: JavaDoc is missing. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExternalResourceOptions; + +import org.junit.Test; + +import java.util.Map; +import java.util.Set; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the {@link ExternalResourceUtils} class. + */ +public class ExternalResourceUtilsTest { + + @Test + public void testGetExternalResourceList() { + final Configuration config = new Configuration(); + config.setString(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, "foo,bar"); + + final Set<String> resourceSet = ExternalResourceUtils.getExternalResourceList(config); + + assertThat(resourceSet.size(), is(2)); + assertTrue(resourceSet.contains("foo")); + assertTrue(resourceSet.contains("bar")); + } + + @Test + public void testGetExternalResourceConfigMap() { Review comment: It would be better to also add test cases for: - configKey is not specified (or empty) - illegal amount value - multiple resources, w/ or w/o config key conflicts ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ########## @@ -495,6 +502,32 @@ public void onStopContainerError(ContainerId containerId, Throwable throwable) { // Utility methods // ------------------------------------------------------------------------ + private void setExternalResourceRequestIfPossible(Map<String, Long> externalResources) throws ResourceManagerException { + for (Map.Entry<String, Long> externalResource: externalResources.entrySet()) { + if (externalResource.getValue() == 0) { + continue; + } Review comment: Can we keep all this kind of checks and filters in `ExternalResourceUtils#getExternalResourceConfigMap`? It would be better to have the util method returns only the valid values that can be used directly. ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ########## @@ -495,6 +502,32 @@ public void onStopContainerError(ContainerId containerId, Throwable throwable) { // Utility methods // ------------------------------------------------------------------------ + private void setExternalResourceRequestIfPossible(Map<String, Long> externalResources) throws ResourceManagerException { + for (Map.Entry<String, Long> externalResource: externalResources.entrySet()) { + if (externalResource.getValue() == 0) { + continue; + } + + if (!isYarnResourceTypesAvailable()) { + throw new ResourceManagerException("Could not request extended resource because the underlying YARN does not support it."); + } + + try { + final Class<?> resourceInfoClass = Class.forName(RESOURCE_INFO_CLASS); + final Method setResourceInfoMethod = Resource.class.getMethod("setResourceInformation", String.class, resourceInfoClass); + final Method resourceInfoNewInstance = resourceInfoClass.getMethod("newInstance", String.class, long.class); + setResourceInfoMethod.invoke( + this.resource, + externalResource.getKey(), + resourceInfoNewInstance.invoke(null, externalResource.getKey(), externalResource.getValue())); + log.info("Successfully request the external resource {} with amount {}.", externalResource.getKey(), externalResource.getValue()); + + } catch (Exception e) { + throw new ResourceManagerException("Error in setting the external resource.", e); + } Review comment: I would try to keep the reflections out of `YarnResourceManager` if possible. A good example is `RegisterApplicationMasterResponseReflector`, which handles the version related reflections and always return something safe to `YarnResourceManager`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceConstants.java ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +/** + * This class contains all constants for the external resource framework. + */ +public class ExternalResourceConstants { Review comment: For better deduplication, we can even have a util class for generating the configuration keys: ``` public class ExternalResourceConfigUtils { ; public static String genericKeyWithSuffix(String suffix) { return keyWithResourceNameAndSuffix("<resource_name>", suffix); } public static String keyWithResourceNameAndSuffix(String resourceName, String suffix) { return String.format("external-resource.%s.%s", resourceName, suffix); } } ``` And then the config options (take amount as an example): ``` key(ExternalResourceConfigUtils.keyWithResourceNameAndSuffix(AMOUNT_SUFFIX)) ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceConstants.java ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +/** + * This class contains all constants for the external resource framework. + */ +public class ExternalResourceConstants { Review comment: I would suggest to place these constants in `ExternalResourceOptions`. In this way, you can use these constants for assembling the configuration option keys. This guarantees the keys shown in the user docs are consistent with those actually used in runtime. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExternalResourceOptions; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * Utility class for external resource framework. + */ +public class ExternalResourceUtils { + private static final Logger LOG = LoggerFactory.getLogger(ExternalResourceUtils.class); + + // regex pattern to split the defined external resources + private static final Pattern resourceListPattern = Pattern.compile("\\s*,\\s*"); + + /** + * Get the enabled external resource list from configuration. + */ + @VisibleForTesting + static final Set<String> getExternalResourceList(Configuration config) { + return resourceListPattern.splitAsStream(config.getString(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, "")) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); + } + + /** + * Get the config key and value of all of enabled external resources from the underlying external resource manager. + * Each external resource manager(Yarn / Kubernetes) have their own configuration suffix. + */ + public static Map<String, Long> getExternalResourceConfigMap(Configuration config, String suffix) { + final Set<String> resourceSet = getExternalResourceList(config); + LOG.info("Enabled external resources: {}", resourceSet); + + if (resourceSet.isEmpty()) { + return Collections.emptyMap(); + } + + final Map<String, Long> externalResourceConfigs = new HashMap<>(); + for (String resourceName: resourceSet) { + final String configKey = config.getString(ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX + resourceName + suffix, ""); + final long amount = config.getLong(ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX + resourceName + ExternalResourceConstants.EXTERNAL_RESOURCE_AMOUNT_SUFFIX, 0); Review comment: Deprecated methods used. We might construct local `ConfigOption` variables first. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExternalResourceOptions; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * Utility class for external resource framework. + */ +public class ExternalResourceUtils { + private static final Logger LOG = LoggerFactory.getLogger(ExternalResourceUtils.class); + + // regex pattern to split the defined external resources + private static final Pattern resourceListPattern = Pattern.compile("\\s*,\\s*"); + + /** + * Get the enabled external resource list from configuration. + */ + @VisibleForTesting + static final Set<String> getExternalResourceList(Configuration config) { + return resourceListPattern.splitAsStream(config.getString(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, "")) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); + } + + /** + * Get the config key and value of all of enabled external resources from the underlying external resource manager. + * Each external resource manager(Yarn / Kubernetes) have their own configuration suffix. + */ + public static Map<String, Long> getExternalResourceConfigMap(Configuration config, String suffix) { Review comment: This method is very confusing. Neither the method name nor the JavaDoc explained that it returns a map from the deployment specific key to the amount. I would suggest to name it simply `getExternalResources`, with more details in JavaDoc explains that the returned keys should be used for deployment specific container request, and values should be the amount. ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ########## @@ -495,6 +502,32 @@ public void onStopContainerError(ContainerId containerId, Throwable throwable) { // Utility methods // ------------------------------------------------------------------------ + private void setExternalResourceRequestIfPossible(Map<String, Long> externalResources) throws ResourceManagerException { + for (Map.Entry<String, Long> externalResource: externalResources.entrySet()) { + if (externalResource.getValue() == 0) { + continue; + } + + if (!isYarnResourceTypesAvailable()) { + throw new ResourceManagerException("Could not request extended resource because the underlying YARN does not support it."); + } Review comment: 1. Why is this inside the loop? Shouldn't this check happens before the loop? 2. Throwing the exception here will prevent RM from being constructed. It does not make sense to fail the resource manager when the Yarn version does not support external resources. I think it's good enough to log a warning message and simply ignore the external resources. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExternalResourceOptions; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * Utility class for external resource framework. + */ +public class ExternalResourceUtils { + private static final Logger LOG = LoggerFactory.getLogger(ExternalResourceUtils.class); + + // regex pattern to split the defined external resources + private static final Pattern resourceListPattern = Pattern.compile("\\s*,\\s*"); + + /** + * Get the enabled external resource list from configuration. + */ + @VisibleForTesting + static final Set<String> getExternalResourceList(Configuration config) { + return resourceListPattern.splitAsStream(config.getString(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, "")) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); + } + + /** + * Get the config key and value of all of enabled external resources from the underlying external resource manager. + * Each external resource manager(Yarn / Kubernetes) have their own configuration suffix. + */ + public static Map<String, Long> getExternalResourceConfigMap(Configuration config, String suffix) { Review comment: Also, this class should provide deployment specific utils methods, e.g., `getExternalResourcesForKubernetes`, `getExternalResourcesForYarn`. This helps hide the suffixes from to the callers, and save us from checking the sanity of suffixes. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExternalResourceOptions; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * Utility class for external resource framework. + */ +public class ExternalResourceUtils { + private static final Logger LOG = LoggerFactory.getLogger(ExternalResourceUtils.class); + + // regex pattern to split the defined external resources + private static final Pattern resourceListPattern = Pattern.compile("\\s*,\\s*"); + + /** + * Get the enabled external resource list from configuration. + */ + @VisibleForTesting + static final Set<String> getExternalResourceList(Configuration config) { + return resourceListPattern.splitAsStream(config.getString(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, "")) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); + } + + /** + * Get the config key and value of all of enabled external resources from the underlying external resource manager. + * Each external resource manager(Yarn / Kubernetes) have their own configuration suffix. + */ + public static Map<String, Long> getExternalResourceConfigMap(Configuration config, String suffix) { + final Set<String> resourceSet = getExternalResourceList(config); + LOG.info("Enabled external resources: {}", resourceSet); + + if (resourceSet.isEmpty()) { + return Collections.emptyMap(); + } + + final Map<String, Long> externalResourceConfigs = new HashMap<>(); + for (String resourceName: resourceSet) { + final String configKey = config.getString(ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX + resourceName + suffix, ""); + final long amount = config.getLong(ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX + resourceName + ExternalResourceConstants.EXTERNAL_RESOURCE_AMOUNT_SUFFIX, 0); + if (amount != 0 && !StringUtils.isNullOrWhitespaceOnly(configKey)) { + externalResourceConfigs.put(configKey, amount); Review comment: Should `configKey` be trimmed before used? ########## File path: flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * Configuration options for external resources and external resource drivers. + */ +@PublicEvolving +public class ExternalResourceOptions { + + public static final ConfigOption<String> EXTERNAL_RESOURCE_LIST = + key("external-resources") + .stringType() + .noDefaultValue() + .withDescription("An optional list of used external resources with delimiter \",\". If configured, Flink " + + "would check if the relevant configs exist for that resource. At the ResourceManager side, Flink will forward " + + "the request to the underlying external resource manager. At the TaskExecutor side, Flink will launch the " + + "corresponding ExternalResourceDriver."); + + public static final ConfigOption<String> EXTERNAL_RESOURCE_DRIVER_FACTORY_CLASS = Review comment: Better to add `@SuppressWarnings("unused")`, and explain in JavaDoc that it is intentionally included into user docs while unused. Same for other options. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParameters.java ########## @@ -106,6 +108,10 @@ public double getTaskManagerCPU() { return containeredTaskManagerParameters.getTaskExecutorProcessSpec().getCpuCores().getValue().doubleValue(); } + public Map<String, Long> getTaskManagerExternalResources() { + return ExternalResourceUtils.getExternalResourceConfigMap(getFlinkConfiguration(), ExternalResourceConstants.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX); Review comment: Should this be derived in the constructor? To avoid potential redundant computations. ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ########## @@ -495,6 +502,32 @@ public void onStopContainerError(ContainerId containerId, Throwable throwable) { // Utility methods // ------------------------------------------------------------------------ + private void setExternalResourceRequestIfPossible(Map<String, Long> externalResources) throws ResourceManagerException { + for (Map.Entry<String, Long> externalResource: externalResources.entrySet()) { + if (externalResource.getValue() == 0) { + continue; + } + + if (!isYarnResourceTypesAvailable()) { + throw new ResourceManagerException("Could not request extended resource because the underlying YARN does not support it."); + } + + try { + final Class<?> resourceInfoClass = Class.forName(RESOURCE_INFO_CLASS); + final Method setResourceInfoMethod = Resource.class.getMethod("setResourceInformation", String.class, resourceInfoClass); + final Method resourceInfoNewInstance = resourceInfoClass.getMethod("newInstance", String.class, long.class); + setResourceInfoMethod.invoke( + this.resource, + externalResource.getKey(), + resourceInfoNewInstance.invoke(null, externalResource.getKey(), externalResource.getValue())); + log.info("Successfully request the external resource {} with amount {}.", externalResource.getKey(), externalResource.getValue()); + + } catch (Exception e) { + throw new ResourceManagerException("Error in setting the external resource.", e); + } Review comment: BTW, do we know which hadoop version is the external resources supported? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExternalResourceOptions; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * Utility class for external resource framework. + */ +public class ExternalResourceUtils { + private static final Logger LOG = LoggerFactory.getLogger(ExternalResourceUtils.class); + + // regex pattern to split the defined external resources + private static final Pattern resourceListPattern = Pattern.compile("\\s*,\\s*"); + + /** + * Get the enabled external resource list from configuration. + */ + @VisibleForTesting + static final Set<String> getExternalResourceList(Configuration config) { + return resourceListPattern.splitAsStream(config.getString(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, "")) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); Review comment: If we change the type of `ExternalResourceOptions#EXTERNAL_RESOURCE_LIST` to `ConfigOption<List<String>>`, we can reuse `ConfigUtils#decodeListFromConfig` here. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExternalResourceOptions; + +import org.junit.Test; + +import java.util.Map; +import java.util.Set; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the {@link ExternalResourceUtils} class. + */ +public class ExternalResourceUtilsTest { + + @Test + public void testGetExternalResourceList() { + final Configuration config = new Configuration(); + config.setString(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, "foo,bar"); + + final Set<String> resourceSet = ExternalResourceUtils.getExternalResourceList(config); + + assertThat(resourceSet.size(), is(2)); + assertTrue(resourceSet.contains("foo")); + assertTrue(resourceSet.contains("bar")); + } + + @Test + public void testGetExternalResourceConfigMap() { + final Configuration config = new Configuration(); + final String suffix = ExternalResourceConstants.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX; Review comment: I would suggest to use an arbitrary suffix. ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ########## @@ -527,6 +560,21 @@ private FinalApplicationStatus getYarnStatus(ApplicationStatus status) { return new Tuple2<>(host, Integer.valueOf(port)); } + /** + * Check whether the underlying YARN support to set extended resources. + * In case of Hadoop 3.1+ or 2.10+, the ResourceInformation class should be available on the classpath. + * + * @return Whether we could set extended resources. + */ + private static boolean isYarnResourceTypesAvailable() { + try { + Class.forName(RESOURCE_INFO_CLASS); + return true; + } catch (ClassNotFoundException e) { + return false; + } + } Review comment: Same here. Let's keep the reflections in a separated class. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExternalResourceOptions; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * Utility class for external resource framework. + */ +public class ExternalResourceUtils { + private static final Logger LOG = LoggerFactory.getLogger(ExternalResourceUtils.class); + + // regex pattern to split the defined external resources + private static final Pattern resourceListPattern = Pattern.compile("\\s*,\\s*"); + + /** + * Get the enabled external resource list from configuration. + */ + @VisibleForTesting + static final Set<String> getExternalResourceList(Configuration config) { + return resourceListPattern.splitAsStream(config.getString(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, "")) + .filter(r -> !r.isEmpty()) // splitting an empty string results in an empty string on jdk9+ + .collect(Collectors.toSet()); + } + + /** + * Get the config key and value of all of enabled external resources from the underlying external resource manager. + * Each external resource manager(Yarn / Kubernetes) have their own configuration suffix. + */ + public static Map<String, Long> getExternalResourceConfigMap(Configuration config, String suffix) { + final Set<String> resourceSet = getExternalResourceList(config); + LOG.info("Enabled external resources: {}", resourceSet); + + if (resourceSet.isEmpty()) { + return Collections.emptyMap(); + } + + final Map<String, Long> externalResourceConfigs = new HashMap<>(); + for (String resourceName: resourceSet) { + final String configKey = config.getString(ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX + resourceName + suffix, ""); + final long amount = config.getLong(ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX + resourceName + ExternalResourceConstants.EXTERNAL_RESOURCE_AMOUNT_SUFFIX, 0); + if (amount != 0 && !StringUtils.isNullOrWhitespaceOnly(configKey)) { + externalResourceConfigs.put(configKey, amount); Review comment: What if the key already exist? Is it possible that the user specified the same key for two different resources? ########## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java ########## @@ -113,10 +124,12 @@ public void testMainContainerResourceRequirements() { final Map<String, Quantity> requests = resourceRequirements.getRequests(); assertEquals(Double.toString(TASK_MANAGER_CPU), requests.get("cpu").getAmount()); assertEquals(TOTAL_PROCESS_MEMORY + "Mi", requests.get("memory").getAmount()); + assertEquals(Long.toString(RESOURCE_AMOUNT), requests.get(RESOURCE_CONFIG_KEY).getAmount()); final Map<String, Quantity> limits = resourceRequirements.getLimits(); assertEquals(Double.toString(TASK_MANAGER_CPU), limits.get("cpu").getAmount()); assertEquals(TOTAL_PROCESS_MEMORY + "Mi", limits.get("memory").getAmount()); + assertEquals(Long.toString(RESOURCE_AMOUNT), limits.get(RESOURCE_CONFIG_KEY).getAmount()); Review comment: Let's add another test case for the external resources. It would be good to have each test case verifies one thing only. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
