tillrohrmann commented on a change in pull request #11854: URL: https://github.com/apache/flink/pull/11854#discussion_r425822239
########## File path: flink-core/src/main/java/org/apache/flink/api/common/externalresource/ExternalResourceInfo.java ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.externalresource; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Collection; +import java.util.Optional; + +/** + * Contains the information of an external resource. + */ +@PublicEvolving +public interface ExternalResourceInfo { + + /** + * Get the property indicated by the specified key. + * + * @param key of the required property + * @return optional property of given key Review comment: ```suggestion * @return an {@code Optional} containing the value associated to the key, or an empty {@code Optional} if no value has been stored under the given key ``` ########## File path: flink-core/src/main/java/org/apache/flink/core/plugin/PluginManager.java ########## @@ -18,42 +18,12 @@ package org.apache.flink.core.plugin; -import org.apache.flink.annotation.Internal; - -import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators; - -import javax.annotation.concurrent.ThreadSafe; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.Iterator; /** - * Manager class and entry-point for the plugin mechanism in Flink. + * Interface for manager class and entry-point for the plugin mechanism in Flink. Review comment: Maybe: PluginManager is responsible for managing cluster plugins which are loaded using separate class loaders so that their dependencies don't interfere with Flink's dependencies. ########## File path: flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.Preconditions; + +import java.util.List; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * Configuration options for external resources and external resource drivers. + */ +@PublicEvolving +public class ExternalResourceOptions { + + /** The amount of the external resource per task executor. This is used as a suffix in an actual config. */ + public static final String EXTERNAL_RESOURCE_AMOUNT_SUFFIX = "amount"; + + /** The driver factory class of the external resource to use. This is used as a suffix in an actual config. */ + public static final String EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX = "driver-factory.class"; + + /** The suffix of custom config options' prefix for the external resource. */ + public static final String EXTERNAL_RESOURCE_DRIVER_PARAM_SUFFIX = "param."; + + /** The naming pattern of custom config options for the external resource. This is used as a suffix. */ + private static final String EXTERNAL_RESOURCE_DRIVER_PARAM_PATTERN_SUFFIX = EXTERNAL_RESOURCE_DRIVER_PARAM_SUFFIX + "<param>"; + + /** + * The prefix for all external resources configs. Has to be combined with a resource name and + * the configs mentioned below. + */ + private static final String EXTERNAL_RESOURCE_PREFIX = "external-resource"; + + /** + * List of the resource_name of all external resources with delimiter ";", e.g. "gpu;fpga" for two external resource gpu and fpga. + * The resource_name will be used to splice related config options for external resource. Only the resource_name defined here will + * go into effect in external resource framework. + */ + public static final ConfigOption<List<String>> EXTERNAL_RESOURCE_LIST = + key("external-resources") + .stringType() + .asList() + .defaultValues() + .withDescription("List of the <resource_name> of all external resources with delimiter \";\", e.g. \"gpu;fpga\" " + + "for two external resource gpu and fpga. The <resource_name> will be used to splice related config options for " + + "external resource. Only the <resource_name> defined here will go into effect by external resource framework."); + + /** + * Defines the factory class name for the external resource identified by >resource_name<. The factory will be used + * to instantiated the {@link org.apache.flink.api.common.externalresource.ExternalResourceDriver} at the TaskExecutor side. + * + * <p>It is intentionally included into user docs while unused. + */ + @SuppressWarnings("unused") + public static final ConfigOption<String> EXTERNAL_RESOURCE_DRIVER_FACTORY_CLASS = + key(genericKeyWithSuffix(EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX)) + .stringType() + .noDefaultValue() + .withDescription("Defines the factory class name for the external resource identified by <resource_name>. The " + + "factory will be used to instantiated the ExternalResourceDriver at the TaskExecutor side. For example, " + Review comment: ```suggestion "factory will be used to instantiate the ExternalResourceDriver at the TaskExecutor side. For example, " + ``` ########## File path: flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * Configuration options for external resources and external resource drivers. + */ +@PublicEvolving +public class ExternalResourceOptions { Review comment: Ok. Additionally I would propose to add some documentation on how to use the external resource framework. Maybe there we could add the above example to explain how things need to be configured. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.externalresource.ExternalResourceDriver; +import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory; +import org.apache.flink.api.common.externalresource.ExternalResourceInfo; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DelegatingConfiguration; +import org.apache.flink.configuration.ExternalResourceOptions; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * Utility class for external resource framework. + */ +public class ExternalResourceUtils { + + private static final Logger LOG = LoggerFactory.getLogger(ExternalResourceUtils.class); + + private ExternalResourceUtils() { + throw new UnsupportedOperationException("This class should never be instantiated."); + } + + /** + * Get the enabled external resource list from configuration. + */ + private static Set<String> getExternalResourceSet(Configuration config) { + return new HashSet<>(config.get(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST)); + } + + /** + * Get the external resources map. + */ + public static Map<String, Long> getExternalResources(Configuration config, String suffix) { Review comment: I think it would be nice to add proper JavaDocs here and to explain the parameters. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceInfoProvider.java ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.externalresource.ExternalResourceInfo; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +/** + * Provide the information of external resources. + */ +public class ExternalResourceInfoProvider { Review comment: I would suggest to introduce an interface `ExternalResourceInfoProvider` and then to implement a `StaticExternalResourceInfoProvider` as you have done it here. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java ########## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +import org.apache.flink.api.common.externalresource.ExternalResourceDriver; +import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExternalResourceOptions; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.core.plugin.TestingPluginManager; +import org.apache.flink.util.TestLogger; + +import org.apache.commons.collections.IteratorUtils; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the {@link ExternalResourceUtils} class. + */ +public class ExternalResourceUtilsTest extends TestLogger { + + private static final String RESOURCE_NAME_1 = "foo"; + private static final String RESOURCE_NAME_2 = "bar"; + private static final List<String> RESOURCE_LIST = Arrays.asList(RESOURCE_NAME_1, RESOURCE_NAME_2); + private static final long RESOURCE_AMOUNT_1 = 2L; + private static final long RESOURCE_AMOUNT_2 = 1L; + private static final String RESOURCE_CONFIG_KEY_1 = "flink1"; + private static final String RESOURCE_CONFIG_KEY_2 = "flink2"; + private static final String SUFFIX = "flink.config-key"; + + @Test + public void testGetExternalResourcesWithConfigKeyNotSpecifiedOrEmpty() { + final Configuration config = new Configuration(); + final String resourceConfigKey = ""; + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_1); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_2); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), resourceConfigKey); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + assertThat(configMap.entrySet(), is(empty())); + } + + @Test + public void testGetExternalResourcesWithIllegalAmount() { + final Configuration config = new Configuration(); + final long resourceAmount = 0L; + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), resourceAmount); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), RESOURCE_CONFIG_KEY_1); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + assertThat(configMap.entrySet(), is(empty())); + } + + @Test + public void testGetExternalResourcesWithoutConfigAmount() { + final Configuration config = new Configuration(); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), RESOURCE_CONFIG_KEY_1); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + assertThat(configMap.entrySet(), is(empty())); + } + + @Test + public void testGetExternalResourcesWithConflictConfigKey() { + final Configuration config = new Configuration(); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_1); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_2); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), RESOURCE_CONFIG_KEY_1); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, SUFFIX), RESOURCE_CONFIG_KEY_1); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + // Only one of the config key would be kept. + assertThat(configMap.size(), is(1)); + assertTrue(configMap.containsKey(RESOURCE_CONFIG_KEY_1)); + } + + @Test + public void testGetExternalResourcesWithMultipleExternalResource() { + final Configuration config = new Configuration(); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_1); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_2); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), RESOURCE_CONFIG_KEY_1); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, SUFFIX), RESOURCE_CONFIG_KEY_2); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + assertThat(configMap.size(), is(2)); + assertTrue(configMap.containsKey(RESOURCE_CONFIG_KEY_1)); + assertTrue(configMap.containsKey(RESOURCE_CONFIG_KEY_2)); + assertThat(configMap.get(RESOURCE_CONFIG_KEY_1), is(RESOURCE_AMOUNT_1)); + assertThat(configMap.get(RESOURCE_CONFIG_KEY_2), is(RESOURCE_AMOUNT_2)); + } + + @Test + public void testConstructExternalResourceDriversFromConfig() { + final Configuration config = new Configuration(); + final String driverFactoryClassName = TestingExternalResourceDriverFactory.class.getName(); + final Map<Class<?>, Iterator<?>> plugins = new HashMap<>(); + plugins.put(ExternalResourceDriverFactory.class, IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory())); + final PluginManager testingPluginManagerPluginManager = new TestingPluginManager(plugins); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1)); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX), driverFactoryClassName); + + final Map<String, ExternalResourceDriver> externalResourceDrivers = ExternalResourceUtils.externalResourceDriversFromConfig(config, testingPluginManagerPluginManager); + + assertThat(externalResourceDrivers.size(), is(1)); + assertThat(externalResourceDrivers.get(RESOURCE_NAME_1), instanceOf(TestingExternalResourceDriver.class)); + } + + @Test + public void testNotConfiguredFactoryClass() { + final Configuration config = new Configuration(); + final Map<Class<?>, Iterator<?>> plugins = new HashMap<>(); + plugins.put(ExternalResourceDriverFactory.class, IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory())); + final PluginManager testingPluginManagerPluginManager = new TestingPluginManager(plugins); Review comment: ```suggestion final PluginManager testingPluginManager = new TestingPluginManager(plugins); ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/TestingFailedExternalResourceDriverFactory.java ########## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +import org.apache.flink.api.common.externalresource.ExternalResourceDriver; +import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory; +import org.apache.flink.configuration.Configuration; + +/** + * Implementation of {@link ExternalResourceDriverFactory} for testing purpose which failed to create Driver. Review comment: ```suggestion * Implementation of {@link ExternalResourceDriverFactory} for testing purpose which fails to create an {@link ExternalResourceDriver}. ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ########## @@ -1863,6 +1864,7 @@ private TaskExecutor createTaskExecutor(TaskManagerServices taskManagerServices, TaskManagerConfiguration.fromConfiguration(configuration, TM_RESOURCE_SPEC), haServices, taskManagerServices, + ExternalResourceInfoProvider.EMPTY_PROVIDER, Review comment: Maybe rename into ```suggestion ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES, ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java ########## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +import org.apache.flink.api.common.externalresource.ExternalResourceDriver; +import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExternalResourceOptions; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.core.plugin.TestingPluginManager; +import org.apache.flink.util.TestLogger; + +import org.apache.commons.collections.IteratorUtils; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the {@link ExternalResourceUtils} class. + */ +public class ExternalResourceUtilsTest extends TestLogger { + + private static final String RESOURCE_NAME_1 = "foo"; + private static final String RESOURCE_NAME_2 = "bar"; + private static final List<String> RESOURCE_LIST = Arrays.asList(RESOURCE_NAME_1, RESOURCE_NAME_2); + private static final long RESOURCE_AMOUNT_1 = 2L; + private static final long RESOURCE_AMOUNT_2 = 1L; + private static final String RESOURCE_CONFIG_KEY_1 = "flink1"; + private static final String RESOURCE_CONFIG_KEY_2 = "flink2"; + private static final String SUFFIX = "flink.config-key"; + + @Test + public void testGetExternalResourcesWithConfigKeyNotSpecifiedOrEmpty() { + final Configuration config = new Configuration(); + final String resourceConfigKey = ""; + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_1); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_2); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), resourceConfigKey); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + assertThat(configMap.entrySet(), is(empty())); + } + + @Test + public void testGetExternalResourcesWithIllegalAmount() { + final Configuration config = new Configuration(); + final long resourceAmount = 0L; + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), resourceAmount); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), RESOURCE_CONFIG_KEY_1); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + assertThat(configMap.entrySet(), is(empty())); + } + + @Test + public void testGetExternalResourcesWithoutConfigAmount() { + final Configuration config = new Configuration(); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), RESOURCE_CONFIG_KEY_1); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + assertThat(configMap.entrySet(), is(empty())); + } + + @Test + public void testGetExternalResourcesWithConflictConfigKey() { + final Configuration config = new Configuration(); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_1); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_2); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), RESOURCE_CONFIG_KEY_1); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, SUFFIX), RESOURCE_CONFIG_KEY_1); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + // Only one of the config key would be kept. + assertThat(configMap.size(), is(1)); + assertTrue(configMap.containsKey(RESOURCE_CONFIG_KEY_1)); + } + + @Test + public void testGetExternalResourcesWithMultipleExternalResource() { + final Configuration config = new Configuration(); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_1); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_2); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), RESOURCE_CONFIG_KEY_1); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, SUFFIX), RESOURCE_CONFIG_KEY_2); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + assertThat(configMap.size(), is(2)); + assertTrue(configMap.containsKey(RESOURCE_CONFIG_KEY_1)); + assertTrue(configMap.containsKey(RESOURCE_CONFIG_KEY_2)); + assertThat(configMap.get(RESOURCE_CONFIG_KEY_1), is(RESOURCE_AMOUNT_1)); + assertThat(configMap.get(RESOURCE_CONFIG_KEY_2), is(RESOURCE_AMOUNT_2)); + } + + @Test + public void testConstructExternalResourceDriversFromConfig() { + final Configuration config = new Configuration(); + final String driverFactoryClassName = TestingExternalResourceDriverFactory.class.getName(); + final Map<Class<?>, Iterator<?>> plugins = new HashMap<>(); + plugins.put(ExternalResourceDriverFactory.class, IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory())); + final PluginManager testingPluginManagerPluginManager = new TestingPluginManager(plugins); Review comment: ```suggestion final PluginManager testingPluginManager = new TestingPluginManager(plugins); ``` ########## File path: flink-core/src/main/java/org/apache/flink/core/plugin/PluginManagerImpl.java ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.plugin; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators; + +import javax.annotation.concurrent.ThreadSafe; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; + +/** + * Implementation of {@link PluginManager}. + */ +@Internal +@ThreadSafe +public class PluginManagerImpl implements PluginManager { Review comment: I would suggest to pick another name because there might be multiple `PluingManager` implementations. Maybe one could name this implementation `DefaultPluginManager` or `BasicPluginManager`. Admittedly, this name is not a lot better but I think it is a bit better than `Impl`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.externalresource.ExternalResourceDriver; +import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory; +import org.apache.flink.api.common.externalresource.ExternalResourceInfo; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DelegatingConfiguration; +import org.apache.flink.configuration.ExternalResourceOptions; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * Utility class for external resource framework. + */ +public class ExternalResourceUtils { + + private static final Logger LOG = LoggerFactory.getLogger(ExternalResourceUtils.class); + + private ExternalResourceUtils() { + throw new UnsupportedOperationException("This class should never be instantiated."); + } + + /** + * Get the enabled external resource list from configuration. + */ + private static Set<String> getExternalResourceSet(Configuration config) { + return new HashSet<>(config.get(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST)); + } + + /** + * Get the external resources map. + */ + public static Map<String, Long> getExternalResources(Configuration config, String suffix) { + final Set<String> resourceSet = getExternalResourceSet(config); + LOG.info("Enabled external resources: {}", resourceSet); + + if (resourceSet.isEmpty()) { + return Collections.emptyMap(); + } + + final Map<String, Long> externalResourceConfigs = new HashMap<>(); + for (String resourceName: resourceSet) { + final ConfigOption<Long> amountOption = + key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX)) + .longType() + .noDefaultValue(); + final ConfigOption<String> configKeyOption = + key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, suffix)) + .stringType() + .noDefaultValue(); + final String configKey = config.getString(configKeyOption); + final Optional<Long> amountOpt = config.getOptional(amountOption); + + if (StringUtils.isNullOrWhitespaceOnly(configKey)) { + LOG.warn("Could not find valid {} for {}. Will ignore that resource.", configKeyOption.key(), resourceName); + continue; + } + if (!amountOpt.isPresent()) { + LOG.warn("The amount of the {} should be configured. Will ignore that resource.", resourceName); + continue; + } else if (amountOpt.get() <= 0) { + LOG.warn("The amount of the {} should be positive while finding {}. Will ignore that resource.", amountOpt.get(), resourceName); + continue; + } + + if (externalResourceConfigs.put(configKey, amountOpt.get()) != null) { + LOG.warn("Duplicate config key {} occurred for external resources, the one named {} with amount {} will overwrite the value.", configKey, resourceName, amountOpt); + } else { + LOG.info("Add external resource config for {} with key {} value {}.", resourceName, configKey, amountOpt); + } + } + + return externalResourceConfigs; + } + + /** + * Instantiate the {@link ExternalResourceDriver ExternalResourceDrivers} for all of enabled external resources. {@link ExternalResourceDriver ExternalResourceDrivers} + * Get the map of resource name and amount of all of enabled external resources. + */ + private static Map<String, Long> getExternalResourceAmountMap(Configuration config) { + final Set<String> resourceSet = getExternalResourceSet(config); + LOG.info("Enabled external resources: {}", resourceSet); + + if (resourceSet.isEmpty()) { + return Collections.emptyMap(); + } + + final Map<String, Long> externalResourceAmountMap = new HashMap<>(); + for (String resourceName: resourceSet) { + final ConfigOption<Long> amountOption = + key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX)) + .longType() + .noDefaultValue(); + final Optional<Long> amountOpt = config.getOptional(amountOption); + if (!amountOpt.isPresent()) { + LOG.warn("The amount of the {} should be configured. Will ignore that resource.", resourceName); + } else if (amountOpt.get() <= 0) { + LOG.warn("The amount of the {} should be positive while finding {}. Will ignore that resource.", amountOpt.get(), resourceName); + } else { + externalResourceAmountMap.put(resourceName, amountOpt.get()); + } + } + + return externalResourceAmountMap; + } + + /** + * Instantiate the {@link ExternalResourceDriver ExternalResourceDrivers} for all of enabled external resources. {@link ExternalResourceDriver ExternalResourceDrivers} + * are mapped by its resource name. Review comment: ```suggestion * are mapped to its resource name. ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.externalresource.ExternalResourceDriver; +import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory; +import org.apache.flink.api.common.externalresource.ExternalResourceInfo; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DelegatingConfiguration; +import org.apache.flink.configuration.ExternalResourceOptions; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * Utility class for external resource framework. + */ +public class ExternalResourceUtils { + + private static final Logger LOG = LoggerFactory.getLogger(ExternalResourceUtils.class); + + private ExternalResourceUtils() { + throw new UnsupportedOperationException("This class should never be instantiated."); + } + + /** + * Get the enabled external resource list from configuration. + */ + private static Set<String> getExternalResourceSet(Configuration config) { + return new HashSet<>(config.get(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST)); + } + + /** + * Get the external resources map. + */ + public static Map<String, Long> getExternalResources(Configuration config, String suffix) { + final Set<String> resourceSet = getExternalResourceSet(config); + LOG.info("Enabled external resources: {}", resourceSet); + + if (resourceSet.isEmpty()) { + return Collections.emptyMap(); + } + + final Map<String, Long> externalResourceConfigs = new HashMap<>(); + for (String resourceName: resourceSet) { + final ConfigOption<Long> amountOption = + key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX)) + .longType() + .noDefaultValue(); + final ConfigOption<String> configKeyOption = + key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, suffix)) + .stringType() + .noDefaultValue(); + final String configKey = config.getString(configKeyOption); + final Optional<Long> amountOpt = config.getOptional(amountOption); + + if (StringUtils.isNullOrWhitespaceOnly(configKey)) { + LOG.warn("Could not find valid {} for {}. Will ignore that resource.", configKeyOption.key(), resourceName); + continue; + } + if (!amountOpt.isPresent()) { + LOG.warn("The amount of the {} should be configured. Will ignore that resource.", resourceName); + continue; + } else if (amountOpt.get() <= 0) { + LOG.warn("The amount of the {} should be positive while finding {}. Will ignore that resource.", amountOpt.get(), resourceName); + continue; + } + + if (externalResourceConfigs.put(configKey, amountOpt.get()) != null) { + LOG.warn("Duplicate config key {} occurred for external resources, the one named {} with amount {} will overwrite the value.", configKey, resourceName, amountOpt); + } else { + LOG.info("Add external resource config for {} with key {} value {}.", resourceName, configKey, amountOpt); + } + } + + return externalResourceConfigs; + } + + /** + * Instantiate the {@link ExternalResourceDriver ExternalResourceDrivers} for all of enabled external resources. {@link ExternalResourceDriver ExternalResourceDrivers} + * Get the map of resource name and amount of all of enabled external resources. + */ + private static Map<String, Long> getExternalResourceAmountMap(Configuration config) { + final Set<String> resourceSet = getExternalResourceSet(config); + LOG.info("Enabled external resources: {}", resourceSet); + + if (resourceSet.isEmpty()) { + return Collections.emptyMap(); + } + + final Map<String, Long> externalResourceAmountMap = new HashMap<>(); + for (String resourceName: resourceSet) { + final ConfigOption<Long> amountOption = + key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX)) + .longType() + .noDefaultValue(); + final Optional<Long> amountOpt = config.getOptional(amountOption); + if (!amountOpt.isPresent()) { + LOG.warn("The amount of the {} should be configured. Will ignore that resource.", resourceName); + } else if (amountOpt.get() <= 0) { + LOG.warn("The amount of the {} should be positive while finding {}. Will ignore that resource.", amountOpt.get(), resourceName); + } else { + externalResourceAmountMap.put(resourceName, amountOpt.get()); + } + } + + return externalResourceAmountMap; + } + + /** + * Instantiate the {@link ExternalResourceDriver ExternalResourceDrivers} for all of enabled external resources. {@link ExternalResourceDriver ExternalResourceDrivers} + * are mapped by its resource name. + */ + @VisibleForTesting + static Map<String, ExternalResourceDriver> externalResourceDriversFromConfig(Configuration config, PluginManager pluginManager) { + final Set<String> resourceSet = getExternalResourceSet(config); + LOG.info("Enabled external resources: {}", resourceSet); + + if (resourceSet.isEmpty()) { + return Collections.emptyMap(); + } + + final Iterator<ExternalResourceDriverFactory> factoryIterator = pluginManager.load(ExternalResourceDriverFactory.class); + final Map<String, ExternalResourceDriverFactory> externalResourceFactories = new HashMap<>(); + factoryIterator.forEachRemaining( + externalResourceDriverFactory -> externalResourceFactories.put(externalResourceDriverFactory.getClass().getName(), externalResourceDriverFactory)); + + final Map<String, ExternalResourceDriver> externalResourceDrivers = new HashMap<>(); + for (String resourceName: resourceSet) { + final ConfigOption<String> driverClassOption = + key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX)) + .stringType() + .noDefaultValue(); + final String driverFactoryClassName = config.getString(driverClassOption); + if (StringUtils.isNullOrWhitespaceOnly(driverFactoryClassName)) { + LOG.warn("Could not find driver class name for {}. Please make sure {} is configured.", + resourceName, driverClassOption.key()); + continue; + } + + ExternalResourceDriverFactory externalResourceDriverFactory = externalResourceFactories.get(driverFactoryClassName); + if (externalResourceDriverFactory != null) { + DelegatingConfiguration delegatingConfiguration = + new DelegatingConfiguration(config, ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_PARAM_SUFFIX)); + try { + externalResourceDrivers.put(resourceName, externalResourceDriverFactory.createExternalResourceDriver(delegatingConfiguration)); + LOG.info("Add external resources driver for {}.", resourceName); + } catch (Exception e) { + LOG.warn("Could not instantiate driver with factory {} for {}. {}", driverFactoryClassName, resourceName, e); + } + } else { + LOG.warn("Could not find factory class {} for {}.", driverFactoryClassName, resourceName); + } + } + + return externalResourceDrivers; + } + + /** + * Instantiate the {@link ExternalResourceInfoProvider} for all of enabled external resources. + */ + public static ExternalResourceInfoProvider getExternalResourceInfoProvider(Configuration config, PluginManager pluginManager) { Review comment: If we rename the `ExternalResourceInfoProvider` implementation into `StaticExternalResourceInfoProvider` I would suggest to rename this method into `createStaticExternalResourceInfoProvider`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.externalresource.ExternalResourceDriver; +import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory; +import org.apache.flink.api.common.externalresource.ExternalResourceInfo; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DelegatingConfiguration; +import org.apache.flink.configuration.ExternalResourceOptions; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * Utility class for external resource framework. + */ +public class ExternalResourceUtils { + + private static final Logger LOG = LoggerFactory.getLogger(ExternalResourceUtils.class); + + private ExternalResourceUtils() { + throw new UnsupportedOperationException("This class should never be instantiated."); + } + + /** + * Get the enabled external resource list from configuration. + */ + private static Set<String> getExternalResourceSet(Configuration config) { + return new HashSet<>(config.get(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST)); + } + + /** + * Get the external resources map. + */ + public static Map<String, Long> getExternalResources(Configuration config, String suffix) { + final Set<String> resourceSet = getExternalResourceSet(config); + LOG.info("Enabled external resources: {}", resourceSet); + + if (resourceSet.isEmpty()) { + return Collections.emptyMap(); + } + + final Map<String, Long> externalResourceConfigs = new HashMap<>(); + for (String resourceName: resourceSet) { + final ConfigOption<Long> amountOption = + key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX)) + .longType() + .noDefaultValue(); + final ConfigOption<String> configKeyOption = + key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, suffix)) + .stringType() + .noDefaultValue(); + final String configKey = config.getString(configKeyOption); + final Optional<Long> amountOpt = config.getOptional(amountOption); + + if (StringUtils.isNullOrWhitespaceOnly(configKey)) { + LOG.warn("Could not find valid {} for {}. Will ignore that resource.", configKeyOption.key(), resourceName); + continue; + } + if (!amountOpt.isPresent()) { + LOG.warn("The amount of the {} should be configured. Will ignore that resource.", resourceName); + continue; + } else if (amountOpt.get() <= 0) { + LOG.warn("The amount of the {} should be positive while finding {}. Will ignore that resource.", amountOpt.get(), resourceName); + continue; + } + + if (externalResourceConfigs.put(configKey, amountOpt.get()) != null) { + LOG.warn("Duplicate config key {} occurred for external resources, the one named {} with amount {} will overwrite the value.", configKey, resourceName, amountOpt); + } else { + LOG.info("Add external resource config for {} with key {} value {}.", resourceName, configKey, amountOpt); + } + } + + return externalResourceConfigs; + } + + /** + * Instantiate the {@link ExternalResourceDriver ExternalResourceDrivers} for all of enabled external resources. {@link ExternalResourceDriver ExternalResourceDrivers} + * Get the map of resource name and amount of all of enabled external resources. + */ Review comment: The JavaDocs don't seem to be accurate. ########## File path: flink-core/src/main/java/org/apache/flink/core/plugin/PluginManagerImpl.java ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.plugin; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators; + +import javax.annotation.concurrent.ThreadSafe; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; + +/** + * Implementation of {@link PluginManager}. + */ +@Internal +@ThreadSafe +public class PluginManagerImpl implements PluginManager { Review comment: `PluginManagerTest` should be renamed into the name of the actual implementation. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java ########## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +import org.apache.flink.api.common.externalresource.ExternalResourceDriver; +import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExternalResourceOptions; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.core.plugin.TestingPluginManager; +import org.apache.flink.util.TestLogger; + +import org.apache.commons.collections.IteratorUtils; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the {@link ExternalResourceUtils} class. + */ +public class ExternalResourceUtilsTest extends TestLogger { + + private static final String RESOURCE_NAME_1 = "foo"; + private static final String RESOURCE_NAME_2 = "bar"; + private static final List<String> RESOURCE_LIST = Arrays.asList(RESOURCE_NAME_1, RESOURCE_NAME_2); + private static final long RESOURCE_AMOUNT_1 = 2L; + private static final long RESOURCE_AMOUNT_2 = 1L; + private static final String RESOURCE_CONFIG_KEY_1 = "flink1"; + private static final String RESOURCE_CONFIG_KEY_2 = "flink2"; + private static final String SUFFIX = "flink.config-key"; + + @Test + public void testGetExternalResourcesWithConfigKeyNotSpecifiedOrEmpty() { + final Configuration config = new Configuration(); + final String resourceConfigKey = ""; + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_1); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_2); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), resourceConfigKey); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + assertThat(configMap.entrySet(), is(empty())); + } + + @Test + public void testGetExternalResourcesWithIllegalAmount() { + final Configuration config = new Configuration(); + final long resourceAmount = 0L; + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), resourceAmount); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), RESOURCE_CONFIG_KEY_1); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + assertThat(configMap.entrySet(), is(empty())); + } + + @Test + public void testGetExternalResourcesWithoutConfigAmount() { + final Configuration config = new Configuration(); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), RESOURCE_CONFIG_KEY_1); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + assertThat(configMap.entrySet(), is(empty())); + } + + @Test + public void testGetExternalResourcesWithConflictConfigKey() { + final Configuration config = new Configuration(); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_1); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_2); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), RESOURCE_CONFIG_KEY_1); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, SUFFIX), RESOURCE_CONFIG_KEY_1); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + // Only one of the config key would be kept. + assertThat(configMap.size(), is(1)); + assertTrue(configMap.containsKey(RESOURCE_CONFIG_KEY_1)); + } + + @Test + public void testGetExternalResourcesWithMultipleExternalResource() { + final Configuration config = new Configuration(); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_1); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_2); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), RESOURCE_CONFIG_KEY_1); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, SUFFIX), RESOURCE_CONFIG_KEY_2); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + assertThat(configMap.size(), is(2)); + assertTrue(configMap.containsKey(RESOURCE_CONFIG_KEY_1)); + assertTrue(configMap.containsKey(RESOURCE_CONFIG_KEY_2)); + assertThat(configMap.get(RESOURCE_CONFIG_KEY_1), is(RESOURCE_AMOUNT_1)); + assertThat(configMap.get(RESOURCE_CONFIG_KEY_2), is(RESOURCE_AMOUNT_2)); + } + + @Test + public void testConstructExternalResourceDriversFromConfig() { + final Configuration config = new Configuration(); + final String driverFactoryClassName = TestingExternalResourceDriverFactory.class.getName(); + final Map<Class<?>, Iterator<?>> plugins = new HashMap<>(); + plugins.put(ExternalResourceDriverFactory.class, IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory())); + final PluginManager testingPluginManagerPluginManager = new TestingPluginManager(plugins); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1)); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX), driverFactoryClassName); Review comment: Here we could introduce a `ExternalResourceOptions.getExternalResourceDriverFactoryConfigOption(RESOURCE_NAME_1)`. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java ########## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +import org.apache.flink.api.common.externalresource.ExternalResourceDriver; +import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExternalResourceOptions; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.core.plugin.TestingPluginManager; +import org.apache.flink.util.TestLogger; + +import org.apache.commons.collections.IteratorUtils; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the {@link ExternalResourceUtils} class. + */ +public class ExternalResourceUtilsTest extends TestLogger { + + private static final String RESOURCE_NAME_1 = "foo"; + private static final String RESOURCE_NAME_2 = "bar"; + private static final List<String> RESOURCE_LIST = Arrays.asList(RESOURCE_NAME_1, RESOURCE_NAME_2); + private static final long RESOURCE_AMOUNT_1 = 2L; + private static final long RESOURCE_AMOUNT_2 = 1L; + private static final String RESOURCE_CONFIG_KEY_1 = "flink1"; + private static final String RESOURCE_CONFIG_KEY_2 = "flink2"; + private static final String SUFFIX = "flink.config-key"; + + @Test + public void testGetExternalResourcesWithConfigKeyNotSpecifiedOrEmpty() { + final Configuration config = new Configuration(); + final String resourceConfigKey = ""; + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_1); Review comment: Maybe we could introduce dedicated names for `ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX)`. For example, it could be `ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1)`. For `ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX)` we could introduce something like `ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource(RESOURCE_NAME_1, SUFFIX)`. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java ########## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +import org.apache.flink.api.common.externalresource.ExternalResourceDriver; +import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExternalResourceOptions; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.core.plugin.TestingPluginManager; +import org.apache.flink.util.TestLogger; + +import org.apache.commons.collections.IteratorUtils; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the {@link ExternalResourceUtils} class. + */ +public class ExternalResourceUtilsTest extends TestLogger { + + private static final String RESOURCE_NAME_1 = "foo"; + private static final String RESOURCE_NAME_2 = "bar"; + private static final List<String> RESOURCE_LIST = Arrays.asList(RESOURCE_NAME_1, RESOURCE_NAME_2); + private static final long RESOURCE_AMOUNT_1 = 2L; + private static final long RESOURCE_AMOUNT_2 = 1L; + private static final String RESOURCE_CONFIG_KEY_1 = "flink1"; + private static final String RESOURCE_CONFIG_KEY_2 = "flink2"; + private static final String SUFFIX = "flink.config-key"; + + @Test + public void testGetExternalResourcesWithConfigKeyNotSpecifiedOrEmpty() { + final Configuration config = new Configuration(); + final String resourceConfigKey = ""; + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_1); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_2); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), resourceConfigKey); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + assertThat(configMap.entrySet(), is(empty())); + } + + @Test + public void testGetExternalResourcesWithIllegalAmount() { + final Configuration config = new Configuration(); + final long resourceAmount = 0L; + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), resourceAmount); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), RESOURCE_CONFIG_KEY_1); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + assertThat(configMap.entrySet(), is(empty())); + } + + @Test + public void testGetExternalResourcesWithoutConfigAmount() { + final Configuration config = new Configuration(); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), RESOURCE_CONFIG_KEY_1); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + assertThat(configMap.entrySet(), is(empty())); + } + + @Test + public void testGetExternalResourcesWithConflictConfigKey() { + final Configuration config = new Configuration(); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_1); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_2); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), RESOURCE_CONFIG_KEY_1); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, SUFFIX), RESOURCE_CONFIG_KEY_1); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + // Only one of the config key would be kept. + assertThat(configMap.size(), is(1)); + assertTrue(configMap.containsKey(RESOURCE_CONFIG_KEY_1)); + } + + @Test + public void testGetExternalResourcesWithMultipleExternalResource() { + final Configuration config = new Configuration(); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_1); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_2); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), RESOURCE_CONFIG_KEY_1); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, SUFFIX), RESOURCE_CONFIG_KEY_2); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + assertThat(configMap.size(), is(2)); + assertTrue(configMap.containsKey(RESOURCE_CONFIG_KEY_1)); + assertTrue(configMap.containsKey(RESOURCE_CONFIG_KEY_2)); + assertThat(configMap.get(RESOURCE_CONFIG_KEY_1), is(RESOURCE_AMOUNT_1)); + assertThat(configMap.get(RESOURCE_CONFIG_KEY_2), is(RESOURCE_AMOUNT_2)); + } + + @Test + public void testConstructExternalResourceDriversFromConfig() { + final Configuration config = new Configuration(); + final String driverFactoryClassName = TestingExternalResourceDriverFactory.class.getName(); + final Map<Class<?>, Iterator<?>> plugins = new HashMap<>(); + plugins.put(ExternalResourceDriverFactory.class, IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory())); + final PluginManager testingPluginManagerPluginManager = new TestingPluginManager(plugins); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1)); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX), driverFactoryClassName); + + final Map<String, ExternalResourceDriver> externalResourceDrivers = ExternalResourceUtils.externalResourceDriversFromConfig(config, testingPluginManagerPluginManager); + + assertThat(externalResourceDrivers.size(), is(1)); + assertThat(externalResourceDrivers.get(RESOURCE_NAME_1), instanceOf(TestingExternalResourceDriver.class)); + } + + @Test + public void testNotConfiguredFactoryClass() { + final Configuration config = new Configuration(); + final Map<Class<?>, Iterator<?>> plugins = new HashMap<>(); + plugins.put(ExternalResourceDriverFactory.class, IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory())); + final PluginManager testingPluginManagerPluginManager = new TestingPluginManager(plugins); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1)); + + final Map<String, ExternalResourceDriver> externalResourceDrivers = ExternalResourceUtils.externalResourceDriversFromConfig(config, testingPluginManagerPluginManager); + + assertThat(externalResourceDrivers.entrySet(), is(empty())); + } + + @Test + public void testFactoryPluginDoesNotExist() { + final Configuration config = new Configuration(); + final String driverFactoryClassName = TestingExternalResourceDriverFactory.class.getName(); + final PluginManager testingPluginManagerPluginManager = new TestingPluginManager(Collections.emptyMap()); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1)); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX), driverFactoryClassName); + + final Map<String, ExternalResourceDriver> externalResourceDrivers = ExternalResourceUtils.externalResourceDriversFromConfig(config, testingPluginManagerPluginManager); + + assertThat(externalResourceDrivers.entrySet(), is(empty())); + } + + @Test + public void testFactoryFailedToCreateDriver() { + final Configuration config = new Configuration(); + final String driverFactoryClassName = TestingFailedExternalResourceDriverFactory.class.getName(); + final Map<Class<?>, Iterator<?>> plugins = new HashMap<>(); + plugins.put(ExternalResourceDriverFactory.class, IteratorUtils.singletonIterator(new TestingFailedExternalResourceDriverFactory())); + final PluginManager testingPluginManagerPluginManager = new TestingPluginManager(plugins); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1)); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX), driverFactoryClassName); + + final Map<String, ExternalResourceDriver> externalResourceDrivers = ExternalResourceUtils.externalResourceDriversFromConfig(config, testingPluginManagerPluginManager); + + assertThat(externalResourceDrivers.entrySet(), is(empty())); + } + + @Test + public void testGetExternalResourceInfoProvider() { + final Configuration config = new Configuration(); + final Map<Class<?>, Iterator<?>> plugins = new HashMap<>(); + plugins.put(ExternalResourceDriverFactory.class, IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory())); + final PluginManager testingPluginManagerPluginManager = new TestingPluginManager(plugins); + + final String driverFactoryClassName = TestingExternalResourceDriverFactory.class.getName(); + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1)); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_1); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX), driverFactoryClassName); + + final ExternalResourceInfoProvider externalResourceInfoProvider = ExternalResourceUtils.getExternalResourceInfoProvider(config, testingPluginManagerPluginManager); + + assertNotNull(externalResourceInfoProvider.getExternalResources().get(RESOURCE_NAME_1)); + } + + @Test + public void testGetExternalResourceInfoProviderWithIllegalAmount() { + final Configuration config = new Configuration(); + final Map<Class<?>, Iterator<?>> plugins = new HashMap<>(); + plugins.put(ExternalResourceDriverFactory.class, IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory())); + final PluginManager testingPluginManagerPluginManager = new TestingPluginManager(plugins); + + final String driverFactoryClassName = TestingExternalResourceDriverFactory.class.getName(); + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1)); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), 0); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX), driverFactoryClassName); + + final ExternalResourceInfoProvider externalResourceInfoProvider = ExternalResourceUtils.getExternalResourceInfoProvider(config, testingPluginManagerPluginManager); + + assertThat(externalResourceInfoProvider.getExternalResources().entrySet(), is(empty())); + } + + @Test + public void testGetExternalResourceInfoProviderWithoutConfigAmount() { + final Configuration config = new Configuration(); + final Map<Class<?>, Iterator<?>> plugins = new HashMap<>(); + plugins.put(ExternalResourceDriverFactory.class, IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory())); + final PluginManager testingPluginManagerPluginManager = new TestingPluginManager(plugins); + + final String driverFactoryClassName = TestingExternalResourceDriverFactory.class.getName(); + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1)); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX), driverFactoryClassName); Review comment: Same here. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java ########## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +import org.apache.flink.api.common.externalresource.ExternalResourceDriver; +import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExternalResourceOptions; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.core.plugin.TestingPluginManager; +import org.apache.flink.util.TestLogger; + +import org.apache.commons.collections.IteratorUtils; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the {@link ExternalResourceUtils} class. + */ +public class ExternalResourceUtilsTest extends TestLogger { + + private static final String RESOURCE_NAME_1 = "foo"; + private static final String RESOURCE_NAME_2 = "bar"; + private static final List<String> RESOURCE_LIST = Arrays.asList(RESOURCE_NAME_1, RESOURCE_NAME_2); + private static final long RESOURCE_AMOUNT_1 = 2L; + private static final long RESOURCE_AMOUNT_2 = 1L; + private static final String RESOURCE_CONFIG_KEY_1 = "flink1"; + private static final String RESOURCE_CONFIG_KEY_2 = "flink2"; + private static final String SUFFIX = "flink.config-key"; + + @Test + public void testGetExternalResourcesWithConfigKeyNotSpecifiedOrEmpty() { + final Configuration config = new Configuration(); + final String resourceConfigKey = ""; + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_1); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_2); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), resourceConfigKey); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + assertThat(configMap.entrySet(), is(empty())); + } + + @Test + public void testGetExternalResourcesWithIllegalAmount() { + final Configuration config = new Configuration(); + final long resourceAmount = 0L; + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), resourceAmount); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), RESOURCE_CONFIG_KEY_1); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + assertThat(configMap.entrySet(), is(empty())); + } + + @Test + public void testGetExternalResourcesWithoutConfigAmount() { + final Configuration config = new Configuration(); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), RESOURCE_CONFIG_KEY_1); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + assertThat(configMap.entrySet(), is(empty())); + } + + @Test + public void testGetExternalResourcesWithConflictConfigKey() { + final Configuration config = new Configuration(); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_1); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_2); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), RESOURCE_CONFIG_KEY_1); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, SUFFIX), RESOURCE_CONFIG_KEY_1); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + // Only one of the config key would be kept. + assertThat(configMap.size(), is(1)); + assertTrue(configMap.containsKey(RESOURCE_CONFIG_KEY_1)); + } + + @Test + public void testGetExternalResourcesWithMultipleExternalResource() { + final Configuration config = new Configuration(); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_1); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_2); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), RESOURCE_CONFIG_KEY_1); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, SUFFIX), RESOURCE_CONFIG_KEY_2); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + assertThat(configMap.size(), is(2)); + assertTrue(configMap.containsKey(RESOURCE_CONFIG_KEY_1)); + assertTrue(configMap.containsKey(RESOURCE_CONFIG_KEY_2)); + assertThat(configMap.get(RESOURCE_CONFIG_KEY_1), is(RESOURCE_AMOUNT_1)); + assertThat(configMap.get(RESOURCE_CONFIG_KEY_2), is(RESOURCE_AMOUNT_2)); + } + + @Test + public void testConstructExternalResourceDriversFromConfig() { + final Configuration config = new Configuration(); + final String driverFactoryClassName = TestingExternalResourceDriverFactory.class.getName(); + final Map<Class<?>, Iterator<?>> plugins = new HashMap<>(); + plugins.put(ExternalResourceDriverFactory.class, IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory())); + final PluginManager testingPluginManagerPluginManager = new TestingPluginManager(plugins); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1)); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX), driverFactoryClassName); + + final Map<String, ExternalResourceDriver> externalResourceDrivers = ExternalResourceUtils.externalResourceDriversFromConfig(config, testingPluginManagerPluginManager); + + assertThat(externalResourceDrivers.size(), is(1)); + assertThat(externalResourceDrivers.get(RESOURCE_NAME_1), instanceOf(TestingExternalResourceDriver.class)); + } + + @Test + public void testNotConfiguredFactoryClass() { + final Configuration config = new Configuration(); + final Map<Class<?>, Iterator<?>> plugins = new HashMap<>(); + plugins.put(ExternalResourceDriverFactory.class, IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory())); + final PluginManager testingPluginManagerPluginManager = new TestingPluginManager(plugins); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1)); + + final Map<String, ExternalResourceDriver> externalResourceDrivers = ExternalResourceUtils.externalResourceDriversFromConfig(config, testingPluginManagerPluginManager); + + assertThat(externalResourceDrivers.entrySet(), is(empty())); + } + + @Test + public void testFactoryPluginDoesNotExist() { + final Configuration config = new Configuration(); + final String driverFactoryClassName = TestingExternalResourceDriverFactory.class.getName(); + final PluginManager testingPluginManagerPluginManager = new TestingPluginManager(Collections.emptyMap()); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1)); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX), driverFactoryClassName); + + final Map<String, ExternalResourceDriver> externalResourceDrivers = ExternalResourceUtils.externalResourceDriversFromConfig(config, testingPluginManagerPluginManager); + + assertThat(externalResourceDrivers.entrySet(), is(empty())); + } + + @Test + public void testFactoryFailedToCreateDriver() { + final Configuration config = new Configuration(); + final String driverFactoryClassName = TestingFailedExternalResourceDriverFactory.class.getName(); + final Map<Class<?>, Iterator<?>> plugins = new HashMap<>(); + plugins.put(ExternalResourceDriverFactory.class, IteratorUtils.singletonIterator(new TestingFailedExternalResourceDriverFactory())); + final PluginManager testingPluginManagerPluginManager = new TestingPluginManager(plugins); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1)); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX), driverFactoryClassName); + + final Map<String, ExternalResourceDriver> externalResourceDrivers = ExternalResourceUtils.externalResourceDriversFromConfig(config, testingPluginManagerPluginManager); + + assertThat(externalResourceDrivers.entrySet(), is(empty())); + } + + @Test + public void testGetExternalResourceInfoProvider() { + final Configuration config = new Configuration(); + final Map<Class<?>, Iterator<?>> plugins = new HashMap<>(); + plugins.put(ExternalResourceDriverFactory.class, IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory())); + final PluginManager testingPluginManagerPluginManager = new TestingPluginManager(plugins); + + final String driverFactoryClassName = TestingExternalResourceDriverFactory.class.getName(); + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1)); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_1); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX), driverFactoryClassName); + + final ExternalResourceInfoProvider externalResourceInfoProvider = ExternalResourceUtils.getExternalResourceInfoProvider(config, testingPluginManagerPluginManager); Review comment: `getExternalResourceInfoProvider` would be a bit easier to test if the method did a little bit less. We could, for example, pass in the `externalResourceAmountMap` and `externalResourceDrivers`. We could also introduce an internal method which does the job. That way we don't have to set up all the `PluginManager` things. Moreover, the loading of `Drivers` and the creation of the `externalResourceAmountMap` has already been tested by other methods. ########## File path: flink-tests/src/test/java/org/apache/flink/test/plugin/PluginManagerTest.java ########## @@ -43,7 +44,7 @@ import java.util.Set; /** - * Test for {@link PluginManager}. + * Test for {@link PluginManagerImpl}. */ public class PluginManagerTest extends PluginTestBase { Review comment: ```suggestion public class PluginManagerImplTest extends PluginTestBase { ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java ########## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +import org.apache.flink.api.common.externalresource.ExternalResourceDriver; +import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExternalResourceOptions; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.core.plugin.TestingPluginManager; +import org.apache.flink.util.TestLogger; + +import org.apache.commons.collections.IteratorUtils; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the {@link ExternalResourceUtils} class. + */ +public class ExternalResourceUtilsTest extends TestLogger { + + private static final String RESOURCE_NAME_1 = "foo"; + private static final String RESOURCE_NAME_2 = "bar"; + private static final List<String> RESOURCE_LIST = Arrays.asList(RESOURCE_NAME_1, RESOURCE_NAME_2); + private static final long RESOURCE_AMOUNT_1 = 2L; + private static final long RESOURCE_AMOUNT_2 = 1L; + private static final String RESOURCE_CONFIG_KEY_1 = "flink1"; + private static final String RESOURCE_CONFIG_KEY_2 = "flink2"; + private static final String SUFFIX = "flink.config-key"; + + @Test + public void testGetExternalResourcesWithConfigKeyNotSpecifiedOrEmpty() { + final Configuration config = new Configuration(); + final String resourceConfigKey = ""; + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_1); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_2); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), resourceConfigKey); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + assertThat(configMap.entrySet(), is(empty())); + } + + @Test + public void testGetExternalResourcesWithIllegalAmount() { + final Configuration config = new Configuration(); + final long resourceAmount = 0L; + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), resourceAmount); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), RESOURCE_CONFIG_KEY_1); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + assertThat(configMap.entrySet(), is(empty())); + } + + @Test + public void testGetExternalResourcesWithoutConfigAmount() { + final Configuration config = new Configuration(); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), RESOURCE_CONFIG_KEY_1); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + assertThat(configMap.entrySet(), is(empty())); + } + + @Test + public void testGetExternalResourcesWithConflictConfigKey() { + final Configuration config = new Configuration(); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_1); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_2); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), RESOURCE_CONFIG_KEY_1); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, SUFFIX), RESOURCE_CONFIG_KEY_1); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + // Only one of the config key would be kept. + assertThat(configMap.size(), is(1)); + assertTrue(configMap.containsKey(RESOURCE_CONFIG_KEY_1)); + } + + @Test + public void testGetExternalResourcesWithMultipleExternalResource() { + final Configuration config = new Configuration(); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_1); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_2); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, SUFFIX), RESOURCE_CONFIG_KEY_1); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_2, SUFFIX), RESOURCE_CONFIG_KEY_2); + + final Map<String, Long> configMap = ExternalResourceUtils.getExternalResources(config, SUFFIX); + + assertThat(configMap.size(), is(2)); + assertTrue(configMap.containsKey(RESOURCE_CONFIG_KEY_1)); + assertTrue(configMap.containsKey(RESOURCE_CONFIG_KEY_2)); + assertThat(configMap.get(RESOURCE_CONFIG_KEY_1), is(RESOURCE_AMOUNT_1)); + assertThat(configMap.get(RESOURCE_CONFIG_KEY_2), is(RESOURCE_AMOUNT_2)); + } + + @Test + public void testConstructExternalResourceDriversFromConfig() { + final Configuration config = new Configuration(); + final String driverFactoryClassName = TestingExternalResourceDriverFactory.class.getName(); + final Map<Class<?>, Iterator<?>> plugins = new HashMap<>(); + plugins.put(ExternalResourceDriverFactory.class, IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory())); + final PluginManager testingPluginManagerPluginManager = new TestingPluginManager(plugins); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1)); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX), driverFactoryClassName); + + final Map<String, ExternalResourceDriver> externalResourceDrivers = ExternalResourceUtils.externalResourceDriversFromConfig(config, testingPluginManagerPluginManager); + + assertThat(externalResourceDrivers.size(), is(1)); + assertThat(externalResourceDrivers.get(RESOURCE_NAME_1), instanceOf(TestingExternalResourceDriver.class)); + } + + @Test + public void testNotConfiguredFactoryClass() { + final Configuration config = new Configuration(); + final Map<Class<?>, Iterator<?>> plugins = new HashMap<>(); + plugins.put(ExternalResourceDriverFactory.class, IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory())); + final PluginManager testingPluginManagerPluginManager = new TestingPluginManager(plugins); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1)); + + final Map<String, ExternalResourceDriver> externalResourceDrivers = ExternalResourceUtils.externalResourceDriversFromConfig(config, testingPluginManagerPluginManager); + + assertThat(externalResourceDrivers.entrySet(), is(empty())); + } + + @Test + public void testFactoryPluginDoesNotExist() { + final Configuration config = new Configuration(); + final String driverFactoryClassName = TestingExternalResourceDriverFactory.class.getName(); + final PluginManager testingPluginManagerPluginManager = new TestingPluginManager(Collections.emptyMap()); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1)); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX), driverFactoryClassName); + + final Map<String, ExternalResourceDriver> externalResourceDrivers = ExternalResourceUtils.externalResourceDriversFromConfig(config, testingPluginManagerPluginManager); + + assertThat(externalResourceDrivers.entrySet(), is(empty())); + } + + @Test + public void testFactoryFailedToCreateDriver() { + final Configuration config = new Configuration(); + final String driverFactoryClassName = TestingFailedExternalResourceDriverFactory.class.getName(); + final Map<Class<?>, Iterator<?>> plugins = new HashMap<>(); + plugins.put(ExternalResourceDriverFactory.class, IteratorUtils.singletonIterator(new TestingFailedExternalResourceDriverFactory())); + final PluginManager testingPluginManagerPluginManager = new TestingPluginManager(plugins); + + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1)); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX), driverFactoryClassName); + + final Map<String, ExternalResourceDriver> externalResourceDrivers = ExternalResourceUtils.externalResourceDriversFromConfig(config, testingPluginManagerPluginManager); + + assertThat(externalResourceDrivers.entrySet(), is(empty())); + } + + @Test + public void testGetExternalResourceInfoProvider() { + final Configuration config = new Configuration(); + final Map<Class<?>, Iterator<?>> plugins = new HashMap<>(); + plugins.put(ExternalResourceDriverFactory.class, IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory())); + final PluginManager testingPluginManagerPluginManager = new TestingPluginManager(plugins); + + final String driverFactoryClassName = TestingExternalResourceDriverFactory.class.getName(); + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1)); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_1); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX), driverFactoryClassName); + + final ExternalResourceInfoProvider externalResourceInfoProvider = ExternalResourceUtils.getExternalResourceInfoProvider(config, testingPluginManagerPluginManager); + + assertNotNull(externalResourceInfoProvider.getExternalResources().get(RESOURCE_NAME_1)); + } + + @Test + public void testGetExternalResourceInfoProviderWithIllegalAmount() { + final Configuration config = new Configuration(); + final Map<Class<?>, Iterator<?>> plugins = new HashMap<>(); + plugins.put(ExternalResourceDriverFactory.class, IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory())); + final PluginManager testingPluginManagerPluginManager = new TestingPluginManager(plugins); + + final String driverFactoryClassName = TestingExternalResourceDriverFactory.class.getName(); + config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1)); + config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), 0); + config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1, ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX), driverFactoryClassName); Review comment: Same here. Due to the fact that `getExternalResourceInfoProvider` does a lot of things, we need to set up all these things in order to test that we fail if we provide an illegal amount. I would be simpler if could directly provide the drivers and the amount map to a method which creates the `ExternalResourceInfoProvider`. ########## File path: flink-yarn/src/test/java/org/apache/flink/yarn/ResourceInformationReflectorTest.java ########## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.runtime.util.HadoopUtils; +import org.apache.flink.util.TestLogger; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; + +/** + * Tests for {@link ResourceInformationReflector}. + */ +public class ResourceInformationReflectorTest extends TestLogger { + + private static final String RESOURCE_NAME = "test"; + private static final long RESOURCE_VALUE = 1; + + @Test + public void testSetResourceInformationIfMethodPresent() { + final ResourceInformationReflector resourceInformationReflector = new ResourceInformationReflector(ResourceWithMethod.class.getName(), ResourceInfoWithMethod.class.getName()); + final ResourceWithMethod resourceWithMethod = new ResourceWithMethod(); + resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, RESOURCE_NAME, RESOURCE_VALUE); + + assertNotNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME)); + assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getName(), is(RESOURCE_NAME)); + assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getValue(), is(RESOURCE_VALUE)); + } + + @Test + public void testGetResourceInformationIfMethodPresent() { + final ResourceInformationReflector resourceInformationReflector = new ResourceInformationReflector(ResourceWithMethod.class.getName(), ResourceInfoWithMethod.class.getName()); + final ResourceWithMethod resourceWithMethod = new ResourceWithMethod(); + resourceWithMethod.setResourceInformation(RESOURCE_NAME, ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE)); + + final Map<String, Long> externalResources = resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod); + assertThat(externalResources.size(), is(1)); + assertTrue(externalResources.containsKey(RESOURCE_NAME)); + assertThat(externalResources.get(RESOURCE_NAME), is(RESOURCE_VALUE)); + } + + @Test + public void testSetResourceInformationIfMethodAbsent() { + final ResourceInformationReflector resourceInformationReflector = new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), ResourceInfoWithMethod.class.getName()); + final ResourceWithMethod resourceWithMethod = new ResourceWithMethod(); + resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, RESOURCE_NAME, RESOURCE_VALUE); + + assertNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME)); + } + + @Test + public void testGetResourceInformationIfMethodAbsent() { + final ResourceInformationReflector resourceInformationReflector = new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), ResourceInfoWithMethod.class.getName()); + final ResourceWithMethod resourceWithMethod = new ResourceWithMethod(); + resourceWithMethod.setResourceInformation(RESOURCE_NAME, ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE)); + + final Map<String, Long> externalResources = resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod); + assertThat(externalResources.entrySet(), is(empty())); + } + + @Test + public void testSetAndGetExtendedResourcesWithYarnSupport() { + assumeTrue(HadoopUtils.isMinHadoopVersion(2, 10)); + + final Resource resource = Resource.newInstance(100, 1); + + ResourceInformationReflector.INSTANCE.setResourceInformation(resource, RESOURCE_NAME, RESOURCE_VALUE); + + final Map<String, Long> externalResourcesResult = ResourceInformationReflector.INSTANCE.getExternalResources(resource); + assertThat(externalResourcesResult.size(), is(1)); Review comment: Shouldn't this have size `2` for `cpu` and `mem`? Otherwise the starting index of the loop in `ResourceInformationReflector. getExternalResourcesUnSafe` does not make sense. I would suggest to run these tests with Hadoop 2.10 and Hadoop 3 in order to make sure that they are actually running. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.externalresource.ExternalResourceDriver; +import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory; +import org.apache.flink.api.common.externalresource.ExternalResourceInfo; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DelegatingConfiguration; +import org.apache.flink.configuration.ExternalResourceOptions; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * Utility class for external resource framework. + */ +public class ExternalResourceUtils { + + private static final Logger LOG = LoggerFactory.getLogger(ExternalResourceUtils.class); + + private ExternalResourceUtils() { + throw new UnsupportedOperationException("This class should never be instantiated."); + } + + /** + * Get the enabled external resource list from configuration. + */ + private static Set<String> getExternalResourceSet(Configuration config) { + return new HashSet<>(config.get(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST)); + } + + /** + * Get the external resources map. + */ + public static Map<String, Long> getExternalResources(Configuration config, String suffix) { + final Set<String> resourceSet = getExternalResourceSet(config); + LOG.info("Enabled external resources: {}", resourceSet); + + if (resourceSet.isEmpty()) { + return Collections.emptyMap(); + } + + final Map<String, Long> externalResourceConfigs = new HashMap<>(); + for (String resourceName: resourceSet) { + final ConfigOption<Long> amountOption = + key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX)) + .longType() + .noDefaultValue(); + final ConfigOption<String> configKeyOption = + key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, suffix)) + .stringType() + .noDefaultValue(); + final String configKey = config.getString(configKeyOption); + final Optional<Long> amountOpt = config.getOptional(amountOption); + + if (StringUtils.isNullOrWhitespaceOnly(configKey)) { + LOG.warn("Could not find valid {} for {}. Will ignore that resource.", configKeyOption.key(), resourceName); + continue; + } + if (!amountOpt.isPresent()) { + LOG.warn("The amount of the {} should be configured. Will ignore that resource.", resourceName); + continue; + } else if (amountOpt.get() <= 0) { + LOG.warn("The amount of the {} should be positive while finding {}. Will ignore that resource.", amountOpt.get(), resourceName); + continue; + } + + if (externalResourceConfigs.put(configKey, amountOpt.get()) != null) { + LOG.warn("Duplicate config key {} occurred for external resources, the one named {} with amount {} will overwrite the value.", configKey, resourceName, amountOpt); + } else { + LOG.info("Add external resource config for {} with key {} value {}.", resourceName, configKey, amountOpt); + } + } + + return externalResourceConfigs; + } + + /** + * Instantiate the {@link ExternalResourceDriver ExternalResourceDrivers} for all of enabled external resources. {@link ExternalResourceDriver ExternalResourceDrivers} + * Get the map of resource name and amount of all of enabled external resources. + */ + private static Map<String, Long> getExternalResourceAmountMap(Configuration config) { + final Set<String> resourceSet = getExternalResourceSet(config); + LOG.info("Enabled external resources: {}", resourceSet); + + if (resourceSet.isEmpty()) { + return Collections.emptyMap(); + } + + final Map<String, Long> externalResourceAmountMap = new HashMap<>(); + for (String resourceName: resourceSet) { + final ConfigOption<Long> amountOption = + key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX)) + .longType() + .noDefaultValue(); + final Optional<Long> amountOpt = config.getOptional(amountOption); + if (!amountOpt.isPresent()) { + LOG.warn("The amount of the {} should be configured. Will ignore that resource.", resourceName); + } else if (amountOpt.get() <= 0) { + LOG.warn("The amount of the {} should be positive while finding {}. Will ignore that resource.", amountOpt.get(), resourceName); + } else { + externalResourceAmountMap.put(resourceName, amountOpt.get()); + } + } + + return externalResourceAmountMap; + } + + /** + * Instantiate the {@link ExternalResourceDriver ExternalResourceDrivers} for all of enabled external resources. {@link ExternalResourceDriver ExternalResourceDrivers} + * are mapped by its resource name. + */ + @VisibleForTesting + static Map<String, ExternalResourceDriver> externalResourceDriversFromConfig(Configuration config, PluginManager pluginManager) { + final Set<String> resourceSet = getExternalResourceSet(config); + LOG.info("Enabled external resources: {}", resourceSet); + + if (resourceSet.isEmpty()) { + return Collections.emptyMap(); + } + + final Iterator<ExternalResourceDriverFactory> factoryIterator = pluginManager.load(ExternalResourceDriverFactory.class); + final Map<String, ExternalResourceDriverFactory> externalResourceFactories = new HashMap<>(); + factoryIterator.forEachRemaining( + externalResourceDriverFactory -> externalResourceFactories.put(externalResourceDriverFactory.getClass().getName(), externalResourceDriverFactory)); + + final Map<String, ExternalResourceDriver> externalResourceDrivers = new HashMap<>(); + for (String resourceName: resourceSet) { + final ConfigOption<String> driverClassOption = + key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX)) + .stringType() + .noDefaultValue(); + final String driverFactoryClassName = config.getString(driverClassOption); + if (StringUtils.isNullOrWhitespaceOnly(driverFactoryClassName)) { + LOG.warn("Could not find driver class name for {}. Please make sure {} is configured.", + resourceName, driverClassOption.key()); + continue; + } + + ExternalResourceDriverFactory externalResourceDriverFactory = externalResourceFactories.get(driverFactoryClassName); + if (externalResourceDriverFactory != null) { + DelegatingConfiguration delegatingConfiguration = + new DelegatingConfiguration(config, ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_PARAM_SUFFIX)); + try { + externalResourceDrivers.put(resourceName, externalResourceDriverFactory.createExternalResourceDriver(delegatingConfiguration)); + LOG.info("Add external resources driver for {}.", resourceName); + } catch (Exception e) { + LOG.warn("Could not instantiate driver with factory {} for {}. {}", driverFactoryClassName, resourceName, e); + } + } else { + LOG.warn("Could not find factory class {} for {}.", driverFactoryClassName, resourceName); + } + } + + return externalResourceDrivers; + } + + /** + * Instantiate the {@link ExternalResourceInfoProvider} for all of enabled external resources. + */ + public static ExternalResourceInfoProvider getExternalResourceInfoProvider(Configuration config, PluginManager pluginManager) { + final Map<String, ExternalResourceDriver> externalResourceDrivers = externalResourceDriversFromConfig(config, pluginManager); + final Map<String, Long> externalResourceAmountMap = getExternalResourceAmountMap(config); + final Map<String, Set<? extends ExternalResourceInfo>> externalResources = new HashMap<>(); + for (Map.Entry<String, ExternalResourceDriver> externalResourceDriverEntry : externalResourceDrivers.entrySet()) { + final String resourceName = externalResourceDriverEntry.getKey(); + final ExternalResourceDriver externalResourceDriver = externalResourceDriverEntry.getValue(); + if (externalResourceAmountMap.containsKey(resourceName)) { + try { + final Set<? extends ExternalResourceInfo> externalResourceInfos; + externalResourceInfos = externalResourceDriver.retrieveResourceInfo(externalResourceAmountMap.get(resourceName)); + externalResources.put(resourceName, externalResourceInfos); + } catch (Exception e) { + LOG.error("Failed to retrieve information of external resource {}. {}", resourceName, e); Review comment: ```suggestion LOG.warn("Failed to retrieve information of external resource {}.", resourceName, e); ``` you don't have to specify a place holder for exceptions. They will be still logged. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.externalresource; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.externalresource.ExternalResourceDriver; +import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory; +import org.apache.flink.api.common.externalresource.ExternalResourceInfo; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DelegatingConfiguration; +import org.apache.flink.configuration.ExternalResourceOptions; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * Utility class for external resource framework. + */ +public class ExternalResourceUtils { + + private static final Logger LOG = LoggerFactory.getLogger(ExternalResourceUtils.class); + + private ExternalResourceUtils() { + throw new UnsupportedOperationException("This class should never be instantiated."); + } + + /** + * Get the enabled external resource list from configuration. + */ + private static Set<String> getExternalResourceSet(Configuration config) { + return new HashSet<>(config.get(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST)); + } + + /** + * Get the external resources map. + */ + public static Map<String, Long> getExternalResources(Configuration config, String suffix) { + final Set<String> resourceSet = getExternalResourceSet(config); + LOG.info("Enabled external resources: {}", resourceSet); + + if (resourceSet.isEmpty()) { + return Collections.emptyMap(); + } + + final Map<String, Long> externalResourceConfigs = new HashMap<>(); + for (String resourceName: resourceSet) { + final ConfigOption<Long> amountOption = + key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX)) + .longType() + .noDefaultValue(); + final ConfigOption<String> configKeyOption = + key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, suffix)) + .stringType() + .noDefaultValue(); + final String configKey = config.getString(configKeyOption); + final Optional<Long> amountOpt = config.getOptional(amountOption); + + if (StringUtils.isNullOrWhitespaceOnly(configKey)) { + LOG.warn("Could not find valid {} for {}. Will ignore that resource.", configKeyOption.key(), resourceName); + continue; + } + if (!amountOpt.isPresent()) { + LOG.warn("The amount of the {} should be configured. Will ignore that resource.", resourceName); + continue; + } else if (amountOpt.get() <= 0) { + LOG.warn("The amount of the {} should be positive while finding {}. Will ignore that resource.", amountOpt.get(), resourceName); + continue; + } + + if (externalResourceConfigs.put(configKey, amountOpt.get()) != null) { + LOG.warn("Duplicate config key {} occurred for external resources, the one named {} with amount {} will overwrite the value.", configKey, resourceName, amountOpt); + } else { + LOG.info("Add external resource config for {} with key {} value {}.", resourceName, configKey, amountOpt); + } + } + + return externalResourceConfigs; + } + + /** + * Instantiate the {@link ExternalResourceDriver ExternalResourceDrivers} for all of enabled external resources. {@link ExternalResourceDriver ExternalResourceDrivers} + * Get the map of resource name and amount of all of enabled external resources. + */ + private static Map<String, Long> getExternalResourceAmountMap(Configuration config) { + final Set<String> resourceSet = getExternalResourceSet(config); + LOG.info("Enabled external resources: {}", resourceSet); + + if (resourceSet.isEmpty()) { + return Collections.emptyMap(); + } + + final Map<String, Long> externalResourceAmountMap = new HashMap<>(); + for (String resourceName: resourceSet) { + final ConfigOption<Long> amountOption = + key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX)) + .longType() + .noDefaultValue(); + final Optional<Long> amountOpt = config.getOptional(amountOption); + if (!amountOpt.isPresent()) { + LOG.warn("The amount of the {} should be configured. Will ignore that resource.", resourceName); + } else if (amountOpt.get() <= 0) { + LOG.warn("The amount of the {} should be positive while finding {}. Will ignore that resource.", amountOpt.get(), resourceName); + } else { + externalResourceAmountMap.put(resourceName, amountOpt.get()); + } + } + + return externalResourceAmountMap; + } + + /** + * Instantiate the {@link ExternalResourceDriver ExternalResourceDrivers} for all of enabled external resources. {@link ExternalResourceDriver ExternalResourceDrivers} + * are mapped by its resource name. + */ + @VisibleForTesting + static Map<String, ExternalResourceDriver> externalResourceDriversFromConfig(Configuration config, PluginManager pluginManager) { + final Set<String> resourceSet = getExternalResourceSet(config); + LOG.info("Enabled external resources: {}", resourceSet); + + if (resourceSet.isEmpty()) { + return Collections.emptyMap(); + } + + final Iterator<ExternalResourceDriverFactory> factoryIterator = pluginManager.load(ExternalResourceDriverFactory.class); + final Map<String, ExternalResourceDriverFactory> externalResourceFactories = new HashMap<>(); + factoryIterator.forEachRemaining( + externalResourceDriverFactory -> externalResourceFactories.put(externalResourceDriverFactory.getClass().getName(), externalResourceDriverFactory)); + + final Map<String, ExternalResourceDriver> externalResourceDrivers = new HashMap<>(); + for (String resourceName: resourceSet) { + final ConfigOption<String> driverClassOption = + key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX)) + .stringType() + .noDefaultValue(); + final String driverFactoryClassName = config.getString(driverClassOption); + if (StringUtils.isNullOrWhitespaceOnly(driverFactoryClassName)) { + LOG.warn("Could not find driver class name for {}. Please make sure {} is configured.", + resourceName, driverClassOption.key()); + continue; + } + + ExternalResourceDriverFactory externalResourceDriverFactory = externalResourceFactories.get(driverFactoryClassName); + if (externalResourceDriverFactory != null) { + DelegatingConfiguration delegatingConfiguration = + new DelegatingConfiguration(config, ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_PARAM_SUFFIX)); + try { + externalResourceDrivers.put(resourceName, externalResourceDriverFactory.createExternalResourceDriver(delegatingConfiguration)); + LOG.info("Add external resources driver for {}.", resourceName); + } catch (Exception e) { + LOG.warn("Could not instantiate driver with factory {} for {}. {}", driverFactoryClassName, resourceName, e); + } + } else { + LOG.warn("Could not find factory class {} for {}.", driverFactoryClassName, resourceName); + } + } + + return externalResourceDrivers; + } + + /** + * Instantiate the {@link ExternalResourceInfoProvider} for all of enabled external resources. + */ + public static ExternalResourceInfoProvider getExternalResourceInfoProvider(Configuration config, PluginManager pluginManager) { + final Map<String, ExternalResourceDriver> externalResourceDrivers = externalResourceDriversFromConfig(config, pluginManager); + final Map<String, Long> externalResourceAmountMap = getExternalResourceAmountMap(config); + final Map<String, Set<? extends ExternalResourceInfo>> externalResources = new HashMap<>(); + for (Map.Entry<String, ExternalResourceDriver> externalResourceDriverEntry : externalResourceDrivers.entrySet()) { + final String resourceName = externalResourceDriverEntry.getKey(); + final ExternalResourceDriver externalResourceDriver = externalResourceDriverEntry.getValue(); + if (externalResourceAmountMap.containsKey(resourceName)) { + try { + final Set<? extends ExternalResourceInfo> externalResourceInfos; + externalResourceInfos = externalResourceDriver.retrieveResourceInfo(externalResourceAmountMap.get(resourceName)); + externalResources.put(resourceName, externalResourceInfos); + } catch (Exception e) { + LOG.error("Failed to retrieve information of external resource {}. {}", resourceName, e); + } + } else { + LOG.error("Could not found legal amount configuration for {}.", resourceName); Review comment: ```suggestion LOG.warn("Could not found legal amount configuration for {}.", resourceName); ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
