tillrohrmann commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r425173912



##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
##########
@@ -113,12 +113,17 @@
 
        private TestingFatalErrorHandler testingFatalErrorHandler;
 
-       @Before
-       public void setup() throws Exception {
-               super.setup();
+       @Override
+       protected void setupFlinkConfig() {
+               super.setupFlinkConfig();
 
                flinkConfig.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.parse("1024m"));
                flinkConfig.setString(TaskManagerOptions.RPC_PORT, 
String.valueOf(Constants.TASK_MANAGER_RPC_PORT));
+       }
+
+       @Before
+       public void setup() throws Exception {

Review comment:
       Technically, we are overriding `super.setup` here.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java
##########
@@ -71,13 +71,17 @@
 
        protected FlinkKubeClient flinkKubeClient;
 
-       @Before
-       public void setup() throws Exception {
+       protected void setupFlinkConfig() {
                flinkConfig.setString(KubernetesConfigOptions.NAMESPACE, 
NAMESPACE);
                flinkConfig.setString(KubernetesConfigOptions.CLUSTER_ID, 
CLUSTER_ID);
                flinkConfig.setString(KubernetesConfigOptions.CONTAINER_IMAGE, 
CONTAINER_IMAGE);
                
flinkConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY, 
CONTAINER_IMAGE_PULL_POLICY);
                flinkConfig.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(JOB_MANAGER_MEMORY));
+       }
+
+       @Before
+       public void setup() throws Exception {

Review comment:
       Not necessary, just an idea: Instead of making these methods 
overridable, one could also make this method final and add an empty `onSetup()` 
method which can be overriden by sub classes. That way, sub classes don't have 
to remember to call `super.setup` (they cannot modify the control flow of the 
parent class) and one would not have to add the `@Before` annotation.

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration options for external resources and external resource drivers.
+ */
+@PublicEvolving
+public class ExternalResourceOptions {
+
+       /** The amount of the external resource per task executor. This is used 
as a suffix in an actual config. */
+       public static final String EXTERNAL_RESOURCE_AMOUNT_SUFFIX = "amount";
+
+       /** The driver factory class of the external resource to use. This is 
used as a suffix in an actual config. */
+       public static final String EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX = 
"driver-factory.class";
+
+       /** Defines the configuration key of that external resource in Yarn. 
This is used as a suffix in an actual config. */
+       public static final String EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX = 
"yarn.config-key";
+
+       /** Defines the configuration key of that external resource in 
Kubernetes. This is used as a suffix in an actual config. */
+       public static final String 
EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX = "kubernetes.config-key";
+
+       /** The suffix of custom config options' prefix for the external 
resource. */
+       public static final String EXTERNAL_RESOURCE_DRIVER_PARAM_SUFFIX = 
"param.";
+
+       /** The naming pattern of custom config options for the external 
resource. This is used as a suffix. */
+       private static final String 
EXTERNAL_RESOURCE_DRIVER_PARAM_PATTERN_SUFFIX = 
EXTERNAL_RESOURCE_DRIVER_PARAM_SUFFIX + "<param>";
+
+       /**
+        * The prefix for all external resources configs. Has to be combined 
with a resource name and
+        * the configs mentioned below.
+        */
+       private static final String EXTERNAL_RESOURCE_PREFIX = 
"external-resource";
+
+       /**
+        * List of the resource_name of all external resources with delimiter 
";". The resource_name will be used to
+        * splice related config options for external resource. Only the 
resource_name defined here will go into effect in external
+        * resource framework.

Review comment:
       I have to admit that a short example could help explaining what this 
option does. Just from the description w/o looking at the code I am not 
entirely sure that I understand what it does.

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration options for external resources and external resource drivers.
+ */
+@PublicEvolving
+public class ExternalResourceOptions {
+
+       /** The amount of the external resource per task executor. This is used 
as a suffix in an actual config. */
+       public static final String EXTERNAL_RESOURCE_AMOUNT_SUFFIX = "amount";
+
+       /** The driver factory class of the external resource to use. This is 
used as a suffix in an actual config. */
+       public static final String EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX = 
"driver-factory.class";
+
+       /** Defines the configuration key of that external resource in Yarn. 
This is used as a suffix in an actual config. */
+       public static final String EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX = 
"yarn.config-key";
+
+       /** Defines the configuration key of that external resource in 
Kubernetes. This is used as a suffix in an actual config. */
+       public static final String 
EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX = "kubernetes.config-key";
+
+       /** The suffix of custom config options' prefix for the external 
resource. */
+       public static final String EXTERNAL_RESOURCE_DRIVER_PARAM_SUFFIX = 
"param.";
+
+       /** The naming pattern of custom config options for the external 
resource. This is used as a suffix. */
+       private static final String 
EXTERNAL_RESOURCE_DRIVER_PARAM_PATTERN_SUFFIX = 
EXTERNAL_RESOURCE_DRIVER_PARAM_SUFFIX + "<param>";
+
+       /**
+        * The prefix for all external resources configs. Has to be combined 
with a resource name and
+        * the configs mentioned below.
+        */
+       private static final String EXTERNAL_RESOURCE_PREFIX = 
"external-resource";
+
+       /**
+        * List of the resource_name of all external resources with delimiter 
";". The resource_name will be used to
+        * splice related config options for external resource. Only the 
resource_name defined here will go into effect in external
+        * resource framework.
+        */
+       public static final ConfigOption<List<String>> EXTERNAL_RESOURCE_LIST =
+               key("external-resources")
+                       .stringType()
+                       .asList()
+                       .noDefaultValue()
+                       .withDescription("List of the <resource_name> of all 
external resources with delimiter \";\". " +
+                               "The <resource_name> will be used to splice 
related config options for external resource. Only the " +
+                               "<resource_name> defined here will go into 
effect by external resource framework.");
+
+       /**
+        * Defines the factory class name for the external resource identified 
by &gt;resource_name&lt;. The factory will be used
+        * to instantiated the {@link 
org.apache.flink.api.common.externalresource.ExternalResourceDriver} at the 
TaskExecutor side.
+        *
+        * <p>It is intentionally included into user docs while unused.
+        */
+       @SuppressWarnings("unused")
+       public static final ConfigOption<String> 
EXTERNAL_RESOURCE_DRIVER_FACTORY_CLASS =
+               
key(genericKeyWithSuffix(EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX))
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("Defines the factory class name for 
the external resource identified by <resource_name>. The " +
+                               "factory will be used to instantiated the 
ExternalResourceDriver at the TaskExecutor side.");
+
+       /**
+        * The amount for the external resource specified by 
&gt;resource_name&lt; per TaskExecutor.
+        *
+        * <p>It is intentionally included into user docs while unused.
+        */
+       @SuppressWarnings("WeakerAccess")
+       public static final ConfigOption<Long> EXTERNAL_RESOURCE_AMOUNT =
+               key(genericKeyWithSuffix(EXTERNAL_RESOURCE_AMOUNT_SUFFIX))
+                       .longType()
+                       .noDefaultValue()
+                       .withDescription("The amount for the external resource 
specified by <resource_name> per TaskExecutor.");
+
+       /**
+        * If configured, Flink will add this key to the resource profile of 
container request to Yarn. The value will be
+        * set to {@link ExternalResourceOptions#EXTERNAL_RESOURCE_AMOUNT}.
+        *
+        * <p>It is intentionally included into user docs while unused.
+        */
+       @SuppressWarnings("unused")
+       public static final ConfigOption<String> 
EXTERNAL_RESOURCE_YARN_CONFIG_KEY =
+               
key(genericKeyWithSuffix(EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX))
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("If configured, Flink will add this 
key to the resource profile of container request to Yarn. " +
+                               "The value will be set to the value of " + 
EXTERNAL_RESOURCE_AMOUNT.key() + ".");
+
+       /**
+        * If configured, Flink will add "resources.limits.&gt;config-key&lt;" 
and "resources.requests.&gt;config-key&lt;" to the main
+        * container of TaskExecutor and set the value to {@link 
ExternalResourceOptions#EXTERNAL_RESOURCE_AMOUNT}.
+        *
+        * <p>It is intentionally included into user docs while unused.
+        */
+       @SuppressWarnings("unused")
+       public static final ConfigOption<String> 
EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY =
+               
key(genericKeyWithSuffix(EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX))
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("If configured, Flink will add 
\"resources.limits.<config-key>\" and \"resources.requests.<config-key>\" " +
+                               "to the main container of TaskExecutor and set 
the value to the value of " + EXTERNAL_RESOURCE_AMOUNT.key() + ".");

Review comment:
       This option should go into the `flink-kubernetes` module.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/externalresource/ExternalResourceDriver.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.externalresource;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Set;
+
+/**
+ * Driver which takes the responsibility to manage and provide the information 
of external resource.
+ */
+@PublicEvolving
+public interface ExternalResourceDriver {
+
+       /**
+        * Retrieve the information of the external resources according to the 
amount.
+        */

Review comment:
       JavaDoc is not complete. I think it would be good to explain the 
parameter and the return value.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/externalresource/ExternalResourceInfo.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.externalresource;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+
+/**
+ * Contains the information of an external resource.
+ */
+@PublicEvolving
+public interface ExternalResourceInfo {
+
+       /**
+        * Get the property indicated by the specified key.
+        * If the key does not exist, it will return null.
+        */

Review comment:
       I think proper JavaDocs with `@param` and `@returns` is a bit more 
complete.

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration options for external resources and external resource drivers.
+ */
+@PublicEvolving
+public class ExternalResourceOptions {
+
+       /** The amount of the external resource per task executor. This is used 
as a suffix in an actual config. */
+       public static final String EXTERNAL_RESOURCE_AMOUNT_SUFFIX = "amount";
+
+       /** The driver factory class of the external resource to use. This is 
used as a suffix in an actual config. */
+       public static final String EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX = 
"driver-factory.class";
+
+       /** Defines the configuration key of that external resource in Yarn. 
This is used as a suffix in an actual config. */
+       public static final String EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX = 
"yarn.config-key";
+
+       /** Defines the configuration key of that external resource in 
Kubernetes. This is used as a suffix in an actual config. */
+       public static final String 
EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX = "kubernetes.config-key";

Review comment:
       `flink-core` should not know about Yarn and Kubernetes. Moreover, Mesos 
feels a bit excluded now ;-)
   
   Can we get rid of integration specific config options which are defined in 
`flink-core`?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
##########
@@ -155,4 +163,65 @@ public void 
testGetExternalResourcesWithMultipleExternalResource() {
                assertThat(configMap.get(RESOURCE_CONFIG_KEY_1), 
is(RESOURCE_AMOUNT_1));
                assertThat(configMap.get(RESOURCE_CONFIG_KEY_2), 
is(RESOURCE_AMOUNT_2));
        }
+
+       @Test
+       public void testConstructExternalResourceDriversFromConfig() {
+               final Configuration config = new Configuration();
+               final String driverFactoryClassName = 
TestingExternalResourceDriverFactory.class.getName();
+               final Map<Class<?>, Iterator<?>> plugins = new HashMap<>();
+               plugins.put(ExternalResourceDriverFactory.class, 
IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory()));
+               final PluginManager testingPluginManagerPluginManager = new 
TestingPluginManager(plugins);
+
+               config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, 
Collections.singletonList(RESOURCE_NAME_1));
+               
config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1,
 ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX), 
driverFactoryClassName);
+
+               final Map<String, ExternalResourceDriver> 
externalResourceDrivers = 
ExternalResourceUtils.externalResourceDriversFromConfig(config, 
testingPluginManagerPluginManager);
+
+               assertThat(externalResourceDrivers.size(), is(1));
+               assertTrue(externalResourceDrivers.get(RESOURCE_NAME_1) 
instanceof TestingExternalResourceDriver);
+       }
+
+       @Test
+       public void testNotConfigureFactoryClass() {

Review comment:
       ```suggestion
        public void testNotConfiguredFactoryClass() {
   ```

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration options for external resources and external resource drivers.
+ */
+@PublicEvolving
+public class ExternalResourceOptions {

Review comment:
       I agree. Maybe even a short snippet of an example configuration could 
help:
   
   ```
   external-resources: gpu;fpga
   external-resource.gpu.param.type: Nvidia
   external-resource.gpu.driver-factory.class: GpuDriverFactory
   external-resource.gpu.amount: 2
   
   external-resource.fpga.driver-factory.class: FpgaDriverFactory
   external-resource.fpga.amount: 1
   ```
   
   If I am not mistaken here.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/ResourceInformationReflector.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Looks up the methods related to 
org.apache.hadoop.yarn.api.records.Resource#setResourceInformation.
+ * Only supported in Hadoop 3.0+ or 2.10+.
+ */
+class ResourceInformationReflector {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ResourceInformationReflector.class);
+
+       static final ResourceInformationReflector INSTANCE = new 
ResourceInformationReflector();
+
+       /** Class used to set the extended resource. */
+       private static final String RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation";
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceSetResourceInformationMethod;
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceGetResourcesMethod;
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceInformationGetNameMethod;
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceInformationGetValueMethod;
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceInformationNewInstanceMethod;
+
+       private final boolean isYarnResourceTypesAvailable;
+
+       private ResourceInformationReflector() {
+               this(Resource.class.getName(), RESOURCE_INFO_CLASS);
+       }
+
+       @VisibleForTesting
+       ResourceInformationReflector(String resourceClassName, String 
resourceInfoClassName) {
+               Method resourceSetResourceInformationMethod = null;
+               Method resourceGetResourcesMethod = null;
+               Method resourceInformationGetNameMethod = null;
+               Method resourceInformationGetValueMethod = null;
+               Method resourceInformationNewInstanceMethod = null;
+               boolean isYarnResourceTypesAvailable = false;
+               try {
+                       final Class<?> resourceClass = 
Class.forName(resourceClassName);
+                       final Class<?> resourceInfoClass = 
Class.forName(resourceInfoClassName);
+                       resourceSetResourceInformationMethod = 
resourceClass.getMethod("setResourceInformation", String.class, 
resourceInfoClass);
+                       resourceGetResourcesMethod = 
resourceClass.getMethod("getResources");
+                       resourceInformationGetNameMethod = 
resourceInfoClass.getMethod("getName");
+                       resourceInformationGetValueMethod = 
resourceInfoClass.getMethod("getValue");
+                       resourceInformationNewInstanceMethod = 
resourceInfoClass.getMethod("newInstance", String.class, long.class);
+                       isYarnResourceTypesAvailable = true;
+               } catch (Exception e) {
+                       LOG.debug("The underlying Yarn does not support 
external resource.", e);
+               } finally {
+                       this.resourceSetResourceInformationMethod = 
resourceSetResourceInformationMethod;
+                       this.resourceGetResourcesMethod = 
resourceGetResourcesMethod;
+                       this.resourceInformationGetNameMethod = 
resourceInformationGetNameMethod;
+                       this.resourceInformationGetValueMethod = 
resourceInformationGetValueMethod;
+                       this.resourceInformationNewInstanceMethod = 
resourceInformationNewInstanceMethod;
+                       this.isYarnResourceTypesAvailable = 
isYarnResourceTypesAvailable;
+               }
+       }
+
+       /**
+        * Add the given resourceName and value to the {@link Resource}.
+        */
+       void setResourceInformation(Resource resource, String resourceName, 
long amount) {
+               setResourceInformationUnSafe(resource, resourceName, amount);
+       }
+
+       /**
+        * Same as {@link #setResourceInformation(Resource, String, long)} but
+        * allows to pass objects that are not of type {@link Resource}.
+        */
+       @VisibleForTesting
+       void setResourceInformationUnSafe(Object resource, String resourceName, 
long amount) {
+               if (!isYarnResourceTypesAvailable) {
+                       LOG.info("Will not request extended resource {} because 
the underlying YARN does not support it.", resourceName);
+                       return;
+               }
+               try {
+                       resourceSetResourceInformationMethod.invoke(
+                               resource,
+                               resourceName,
+                               
resourceInformationNewInstanceMethod.invoke(null, resourceName, amount));
+               } catch (Exception e) {
+                       LOG.error("Error in setting the external resource {}, 
will not request it to YARN. {}", resourceName, e);

Review comment:
       ```suggestion
                        LOG.warn("Error in setting the external resource {}. 
Will not request this resource from YARN.", resourceName, e);
   ```
   
   since Flink is still functioning afterwards.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/ResourceInformationReflector.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Looks up the methods related to 
org.apache.hadoop.yarn.api.records.Resource#setResourceInformation.
+ * Only supported in Hadoop 3.0+ or 2.10+.
+ */
+class ResourceInformationReflector {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ResourceInformationReflector.class);
+
+       static final ResourceInformationReflector INSTANCE = new 
ResourceInformationReflector();
+
+       /** Class used to set the extended resource. */
+       private static final String RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation";
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceSetResourceInformationMethod;
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceGetResourcesMethod;
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceInformationGetNameMethod;
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceInformationGetValueMethod;
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceInformationNewInstanceMethod;
+
+       private final boolean isYarnResourceTypesAvailable;
+
+       private ResourceInformationReflector() {
+               this(Resource.class.getName(), RESOURCE_INFO_CLASS);
+       }
+
+       @VisibleForTesting
+       ResourceInformationReflector(String resourceClassName, String 
resourceInfoClassName) {
+               Method resourceSetResourceInformationMethod = null;
+               Method resourceGetResourcesMethod = null;
+               Method resourceInformationGetNameMethod = null;
+               Method resourceInformationGetValueMethod = null;
+               Method resourceInformationNewInstanceMethod = null;
+               boolean isYarnResourceTypesAvailable = false;
+               try {
+                       final Class<?> resourceClass = 
Class.forName(resourceClassName);
+                       final Class<?> resourceInfoClass = 
Class.forName(resourceInfoClassName);
+                       resourceSetResourceInformationMethod = 
resourceClass.getMethod("setResourceInformation", String.class, 
resourceInfoClass);
+                       resourceGetResourcesMethod = 
resourceClass.getMethod("getResources");
+                       resourceInformationGetNameMethod = 
resourceInfoClass.getMethod("getName");
+                       resourceInformationGetValueMethod = 
resourceInfoClass.getMethod("getValue");
+                       resourceInformationNewInstanceMethod = 
resourceInfoClass.getMethod("newInstance", String.class, long.class);
+                       isYarnResourceTypesAvailable = true;
+               } catch (Exception e) {
+                       LOG.debug("The underlying Yarn does not support 
external resource.", e);
+               } finally {
+                       this.resourceSetResourceInformationMethod = 
resourceSetResourceInformationMethod;
+                       this.resourceGetResourcesMethod = 
resourceGetResourcesMethod;
+                       this.resourceInformationGetNameMethod = 
resourceInformationGetNameMethod;
+                       this.resourceInformationGetValueMethod = 
resourceInformationGetValueMethod;
+                       this.resourceInformationNewInstanceMethod = 
resourceInformationNewInstanceMethod;
+                       this.isYarnResourceTypesAvailable = 
isYarnResourceTypesAvailable;

Review comment:
       ```suggestion
               if (isYarnResourceTypesAvailable) {
                        this.resourceSetResourceInformationMethod = 
resourceSetResourceInformationMethod;
                        this.resourceGetResourcesMethod = 
resourceGetResourcesMethod;
                        this.resourceInformationGetNameMethod = 
resourceInformationGetNameMethod;
                        this.resourceInformationGetValueMethod = 
resourceInformationGetValueMethod;
                        this.resourceInformationNewInstanceMethod = 
resourceInformationNewInstanceMethod;
                        this.isYarnResourceTypesAvailable = 
isYarnResourceTypesAvailable;
                        }
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
##########
@@ -155,4 +163,65 @@ public void 
testGetExternalResourcesWithMultipleExternalResource() {
                assertThat(configMap.get(RESOURCE_CONFIG_KEY_1), 
is(RESOURCE_AMOUNT_1));
                assertThat(configMap.get(RESOURCE_CONFIG_KEY_2), 
is(RESOURCE_AMOUNT_2));
        }
+
+       @Test
+       public void testConstructExternalResourceDriversFromConfig() {
+               final Configuration config = new Configuration();
+               final String driverFactoryClassName = 
TestingExternalResourceDriverFactory.class.getName();
+               final Map<Class<?>, Iterator<?>> plugins = new HashMap<>();
+               plugins.put(ExternalResourceDriverFactory.class, 
IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory()));
+               final PluginManager testingPluginManagerPluginManager = new 
TestingPluginManager(plugins);
+
+               config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, 
Collections.singletonList(RESOURCE_NAME_1));
+               
config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1,
 ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX), 
driverFactoryClassName);
+
+               final Map<String, ExternalResourceDriver> 
externalResourceDrivers = 
ExternalResourceUtils.externalResourceDriversFromConfig(config, 
testingPluginManagerPluginManager);
+
+               assertThat(externalResourceDrivers.size(), is(1));
+               assertTrue(externalResourceDrivers.get(RESOURCE_NAME_1) 
instanceof TestingExternalResourceDriver);
+       }
+
+       @Test
+       public void testNotConfigureFactoryClass() {
+               final Configuration config = new Configuration();
+               final Map<Class<?>, Iterator<?>> plugins = new HashMap<>();
+               plugins.put(ExternalResourceDriverFactory.class, 
IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory()));
+               final PluginManager testingPluginManagerPluginManager = new 
TestingPluginManager(plugins);
+
+               config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, 
Collections.singletonList(RESOURCE_NAME_1));
+
+               final Map<String, ExternalResourceDriver> 
externalResourceDrivers = 
ExternalResourceUtils.externalResourceDriversFromConfig(config, 
testingPluginManagerPluginManager);
+
+               assertThat(externalResourceDrivers.size(), is(0));
+       }
+
+       @Test
+       public void testFactoryPluginNotExist() {

Review comment:
       ```suggestion
        public void testFactoryPluginDoesNotExist() {
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -170,4 +201,55 @@ private ExternalResourceUtils() {
 
                return externalResourceDrivers;
        }
+
+       /**
+        * Get the external resource information from drivers. Index by the 
resourceName.
+        */
+       public static Map<String, Set<? extends ExternalResourceInfo>> 
getExternalResourceInfo(Map<String, ExternalResourceDriver> 
externalResourceDrivers, Configuration configuration) {

Review comment:
       ```suggestion
        public static Map<String, Set<? extends ExternalResourceInfo>> 
getExternalResourceInfos(Map<String, ExternalResourceDriver> 
externalResourceDrivers, Configuration configuration) {
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
##########
@@ -155,4 +163,65 @@ public void 
testGetExternalResourcesWithMultipleExternalResource() {
                assertThat(configMap.get(RESOURCE_CONFIG_KEY_1), 
is(RESOURCE_AMOUNT_1));
                assertThat(configMap.get(RESOURCE_CONFIG_KEY_2), 
is(RESOURCE_AMOUNT_2));
        }
+
+       @Test
+       public void testConstructExternalResourceDriversFromConfig() {
+               final Configuration config = new Configuration();
+               final String driverFactoryClassName = 
TestingExternalResourceDriverFactory.class.getName();
+               final Map<Class<?>, Iterator<?>> plugins = new HashMap<>();
+               plugins.put(ExternalResourceDriverFactory.class, 
IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory()));
+               final PluginManager testingPluginManagerPluginManager = new 
TestingPluginManager(plugins);
+
+               config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, 
Collections.singletonList(RESOURCE_NAME_1));
+               
config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1,
 ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX), 
driverFactoryClassName);
+
+               final Map<String, ExternalResourceDriver> 
externalResourceDrivers = 
ExternalResourceUtils.externalResourceDriversFromConfig(config, 
testingPluginManagerPluginManager);
+
+               assertThat(externalResourceDrivers.size(), is(1));
+               assertTrue(externalResourceDrivers.get(RESOURCE_NAME_1) 
instanceof TestingExternalResourceDriver);
+       }
+
+       @Test
+       public void testNotConfigureFactoryClass() {
+               final Configuration config = new Configuration();
+               final Map<Class<?>, Iterator<?>> plugins = new HashMap<>();
+               plugins.put(ExternalResourceDriverFactory.class, 
IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory()));
+               final PluginManager testingPluginManagerPluginManager = new 
TestingPluginManager(plugins);
+
+               config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, 
Collections.singletonList(RESOURCE_NAME_1));
+
+               final Map<String, ExternalResourceDriver> 
externalResourceDrivers = 
ExternalResourceUtils.externalResourceDriversFromConfig(config, 
testingPluginManagerPluginManager);
+
+               assertThat(externalResourceDrivers.size(), is(0));
+       }
+
+       @Test
+       public void testFactoryPluginNotExist() {
+               final Configuration config = new Configuration();
+               final String driverFactoryClassName = 
TestingExternalResourceDriverFactory.class.getName();
+               final PluginManager testingPluginManagerPluginManager = new 
TestingPluginManager(Collections.emptyMap());
+
+               config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, 
Collections.singletonList(RESOURCE_NAME_1));
+               
config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1,
 ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX), 
driverFactoryClassName);
+
+               final Map<String, ExternalResourceDriver> 
externalResourceDrivers = 
ExternalResourceUtils.externalResourceDriversFromConfig(config, 
testingPluginManagerPluginManager);
+
+               assertThat(externalResourceDrivers.size(), is(0));
+       }
+
+       @Test
+       public void testFactoryFailedToCreateDriver() {
+               final Configuration config = new Configuration();
+               final String driverFactoryClassName = 
TestingFailedExternalResourceDriverFactory.class.getName();
+               final Map<Class<?>, Iterator<?>> plugins = new HashMap<>();
+               plugins.put(ExternalResourceDriverFactory.class, 
IteratorUtils.singletonIterator(new 
TestingFailedExternalResourceDriverFactory()));
+               final PluginManager testingPluginManagerPluginManager = new 
TestingPluginManager(plugins);
+
+               config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, 
Collections.singletonList(RESOURCE_NAME_1));
+               
config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1,
 ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX), 
driverFactoryClassName);
+
+               final Map<String, ExternalResourceDriver> 
externalResourceDrivers = 
ExternalResourceUtils.externalResourceDriversFromConfig(config, 
testingPluginManagerPluginManager);
+
+               assertThat(externalResourceDrivers.size(), is(0));

Review comment:
       ```suggestion
                assertThat(externalResourceDrivers, empty());
   ```

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
##########
@@ -176,6 +178,12 @@
        @PublicEvolving
        Histogram getHistogram(String name);
 
+       /**
+        * Get the specific external resource information by the resourceName.
+        */

Review comment:
       JavaDoc is not complete.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -170,4 +201,55 @@ private ExternalResourceUtils() {
 
                return externalResourceDrivers;
        }
+
+       /**
+        * Get the external resource information from drivers. Index by the 
resourceName.
+        */
+       public static Map<String, Set<? extends ExternalResourceInfo>> 
getExternalResourceInfo(Map<String, ExternalResourceDriver> 
externalResourceDrivers, Configuration configuration) {
+               return 
SharedExternalResources.INSTANCE.getSharedExternalResourceInfo(externalResourceDrivers,
 configuration);
+       }
+
+       private static Map<String, Set<? extends ExternalResourceInfo>> 
getExternalResourceInfoInternal(Map<String, ExternalResourceDriver> 
externalResourceDrivers, Configuration configuration) {
+               final Map<String, Long> externalResourceAmountMap = 
getExternalResourceAmountMap(configuration);
+               final Map<String, Set<? extends ExternalResourceInfo>> 
externalResources = new HashMap<>();
+               for (Map.Entry<String, ExternalResourceDriver> 
externalResourceDriverEntry : externalResourceDrivers.entrySet()) {
+                       final String resourceName = 
externalResourceDriverEntry.getKey();
+                       final ExternalResourceDriver externalResourceDriver = 
externalResourceDriverEntry.getValue();
+                       if 
(externalResourceAmountMap.containsKey(resourceName)) {
+                               try {
+                                       final Set<? extends 
ExternalResourceInfo> externalResourceInfos;
+                                       externalResourceInfos = 
externalResourceDriver.retrieveResourceInfo(externalResourceAmountMap.get(resourceName));
+                                       externalResources.put(resourceName, 
externalResourceInfos);
+                               } catch (Exception e) {
+                                       LOG.error("Failed to retrieve 
information of external resource {}. {}", resourceName, e);
+                               }
+                       } else {
+                               LOG.error("Could not found legal amount 
configuration for {}.", resourceName);
+                       }
+               }
+               return externalResources;
+       }
+
+       /**
+        * Currently, the external resource is shared in TaskExecutor level. 
This class ensures that we retrieve the information

Review comment:
       ```suggestion
         * Currently, the external resource can be shared by multiple 
TaskExecutors. This class ensures that we retrieve the information
   ```

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/externalresource/ExternalResourceDriverFactory.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.externalresource;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Factory for {@link ExternalResourceDriver}. Instantiate a driver with 
configuration.
+ */
+@PublicEvolving
+public interface ExternalResourceDriverFactory {
+       /**
+        * Construct the ExternalResourceDriver from configuration.
+        */

Review comment:
       JavaDoc is not complete.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/externalresource/ExternalResourceInfo.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.externalresource;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Collection;
+
+/**
+ * Contains the information of an external resource.
+ */
+@PublicEvolving
+public interface ExternalResourceInfo {
+
+       /**
+        * Get the property indicated by the specified key.
+        */
+       String getProperty(String key);

Review comment:
       I would prefer to return an `Optional`. Many of the existing Java key 
value data structures were implemented before `Optional` was available. If they 
were re-implemented, I bet that some of them would also return `Optional`. The 
benefit of `Optional` is that we make it explicit to the user that he has to 
check whether the value is there or not.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Utility class for external resource framework.
+ */
+public class ExternalResourceUtils {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ExternalResourceUtils.class);
+
+       private ExternalResourceUtils() {
+               throw new UnsupportedOperationException("This class should 
never be instantiated.");
+       }
+
+       /**
+        * Get the enabled external resource list from configuration.
+        */
+       private static Set<String> getExternalResourceSet(Configuration config) 
{
+               return new HashSet<>(ConfigUtils.decodeListFromConfig(config, 
ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, x -> x));
+       }
+
+       /**
+        * Get the external resources map for Kubernetes. The key should be 
used for deployment specific container request,
+        * and values should be the amount of that resource.
+        */
+       public static Map<String, Long> 
getExternalResourcesForKubernetes(Configuration config) {
+               return getExternalResources(config, 
ExternalResourceOptions.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX);
+       }
+
+       /**
+        * Get the external resources map for Yarn. The key should be used for 
deployment specific container request,
+        * and values should be the amount of that resource.
+        */
+       public static Map<String, Long> 
getExternalResourcesForYarn(Configuration config) {
+               return getExternalResources(config, 
ExternalResourceOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX);
+       }
+
+       /**
+        * Get the external resources map.
+        */
+       @VisibleForTesting
+       static Map<String, Long> getExternalResources(Configuration config, 
String suffix) {

Review comment:
       Instead of mapping the external resource amounts to the integration 
specific configuration key, one could also do the mapping in the respective 
implementations (e.g. `YarnResourceManager`, `KubernetesResourceManager`).

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Utility class for external resource framework.
+ */
+public class ExternalResourceUtils {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ExternalResourceUtils.class);
+
+       private ExternalResourceUtils() {
+               throw new UnsupportedOperationException("This class should 
never be instantiated.");
+       }
+
+       /**
+        * Get the enabled external resource list from configuration.
+        */
+       private static Set<String> getExternalResourceSet(Configuration config) 
{
+               return new HashSet<>(ConfigUtils.decodeListFromConfig(config, 
ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, x -> x));

Review comment:
       ```suggestion
   return new 
HashSet<>(config.get(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST));
   ```
   
   could this work as well?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -117,4 +122,52 @@ private ExternalResourceUtils() {
 
                return externalResourceConfigs;
        }
+
+       /**
+        * Instantiate the {@link ExternalResourceDriver}s for all of enabled 
external resources. {@link ExternalResourceDriver}s
+        * are mapped by its resource name.
+        */
+       public static Map<String, ExternalResourceDriver> 
externalResourceDriversFromConfig(Configuration config, PluginManager 
pluginManager) {
+               final Set<String> resourceSet = getExternalResourceSet(config);
+               LOG.info("Enabled external resources: {}", resourceSet);
+
+               if (resourceSet.isEmpty()) {
+                       return Collections.emptyMap();
+               }
+
+               final Iterator<ExternalResourceDriverFactory> factoryIterator = 
pluginManager.load(ExternalResourceDriverFactory.class);
+               final Map<String, ExternalResourceDriverFactory> 
externalResourceFactories = new HashMap<>();
+               factoryIterator.forEachRemaining(
+                       externalResourceDriverFactory -> 
externalResourceFactories.put(externalResourceDriverFactory.getClass().getName(),
 externalResourceDriverFactory));
+
+               final Map<String, ExternalResourceDriver> 
externalResourceDrivers = new HashMap<>();
+               for (String resourceName: resourceSet) {
+                       final ConfigOption<String> driverClassOption =
+                               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, 
ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX))
+                                       .stringType()
+                                       .noDefaultValue();
+                       final String driverFactoryClassName = 
config.getString(driverClassOption);
+                       if 
(StringUtils.isNullOrWhitespaceOnly(driverFactoryClassName)) {
+                               LOG.warn("Could not found driver class name for 
{}. Please make sure {} is configured.",

Review comment:
       ```suggestion
                                LOG.warn("Could not find driver class name for 
{}. Please make sure {} is configured.",
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Utility class for external resource framework.
+ */
+public class ExternalResourceUtils {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ExternalResourceUtils.class);
+
+       private ExternalResourceUtils() {
+               throw new UnsupportedOperationException("This class should 
never be instantiated.");
+       }
+
+       /**
+        * Get the enabled external resource list from configuration.
+        */
+       private static Set<String> getExternalResourceSet(Configuration config) 
{
+               return new HashSet<>(ConfigUtils.decodeListFromConfig(config, 
ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, x -> x));
+       }
+
+       /**
+        * Get the external resources map for Kubernetes. The key should be 
used for deployment specific container request,
+        * and values should be the amount of that resource.
+        */
+       public static Map<String, Long> 
getExternalResourcesForKubernetes(Configuration config) {
+               return getExternalResources(config, 
ExternalResourceOptions.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX);
+       }
+
+       /**
+        * Get the external resources map for Yarn. The key should be used for 
deployment specific container request,
+        * and values should be the amount of that resource.
+        */
+       public static Map<String, Long> 
getExternalResourcesForYarn(Configuration config) {
+               return getExternalResources(config, 
ExternalResourceOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX);
+       }

Review comment:
       I think these methods belong into the specific modules for Yarn and 
Kubernetes.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/externalresource/ExternalResourceDriver.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.externalresource;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Set;
+
+/**
+ * Driver which takes the responsibility to manage and provide the information 
of external resource.
+ */

Review comment:
       I think a bit more explanation how the interfaces 
`ExternalResourceDriver`, `ExternalResourceDriverFactory` and 
`ExternalResourceInfo` play together could be helpful for the interface 
definitions.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link ExternalResourceUtils} class.
+ */
+public class ExternalResourceUtilsTest {

Review comment:
       ```suggestion
   public class ExternalResourceUtilsTest extends TestLogger {
   ```

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration options for external resources and external resource drivers.
+ */
+@PublicEvolving
+public class ExternalResourceOptions {
+
+       /** The amount of the external resource per task executor. This is used 
as a suffix in an actual config. */
+       public static final String EXTERNAL_RESOURCE_AMOUNT_SUFFIX = "amount";
+
+       /** The driver factory class of the external resource to use. This is 
used as a suffix in an actual config. */
+       public static final String EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX = 
"driver-factory.class";
+
+       /** Defines the configuration key of that external resource in Yarn. 
This is used as a suffix in an actual config. */
+       public static final String EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX = 
"yarn.config-key";
+
+       /** Defines the configuration key of that external resource in 
Kubernetes. This is used as a suffix in an actual config. */
+       public static final String 
EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX = "kubernetes.config-key";
+
+       /** The suffix of custom config options' prefix for the external 
resource. */
+       public static final String EXTERNAL_RESOURCE_DRIVER_PARAM_SUFFIX = 
"param.";
+
+       /** The naming pattern of custom config options for the external 
resource. This is used as a suffix. */
+       private static final String 
EXTERNAL_RESOURCE_DRIVER_PARAM_PATTERN_SUFFIX = 
EXTERNAL_RESOURCE_DRIVER_PARAM_SUFFIX + "<param>";
+
+       /**
+        * The prefix for all external resources configs. Has to be combined 
with a resource name and
+        * the configs mentioned below.
+        */
+       private static final String EXTERNAL_RESOURCE_PREFIX = 
"external-resource";
+
+       /**
+        * List of the resource_name of all external resources with delimiter 
";". The resource_name will be used to
+        * splice related config options for external resource. Only the 
resource_name defined here will go into effect in external
+        * resource framework.
+        */
+       public static final ConfigOption<List<String>> EXTERNAL_RESOURCE_LIST =
+               key("external-resources")
+                       .stringType()
+                       .asList()
+                       .noDefaultValue()
+                       .withDescription("List of the <resource_name> of all 
external resources with delimiter \";\". " +
+                               "The <resource_name> will be used to splice 
related config options for external resource. Only the " +
+                               "<resource_name> defined here will go into 
effect by external resource framework.");
+
+       /**
+        * Defines the factory class name for the external resource identified 
by &gt;resource_name&lt;. The factory will be used
+        * to instantiated the {@link 
org.apache.flink.api.common.externalresource.ExternalResourceDriver} at the 
TaskExecutor side.
+        *
+        * <p>It is intentionally included into user docs while unused.
+        */
+       @SuppressWarnings("unused")
+       public static final ConfigOption<String> 
EXTERNAL_RESOURCE_DRIVER_FACTORY_CLASS =
+               
key(genericKeyWithSuffix(EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX))
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("Defines the factory class name for 
the external resource identified by <resource_name>. The " +
+                               "factory will be used to instantiated the 
ExternalResourceDriver at the TaskExecutor side.");
+
+       /**
+        * The amount for the external resource specified by 
&gt;resource_name&lt; per TaskExecutor.
+        *
+        * <p>It is intentionally included into user docs while unused.
+        */
+       @SuppressWarnings("WeakerAccess")
+       public static final ConfigOption<Long> EXTERNAL_RESOURCE_AMOUNT =
+               key(genericKeyWithSuffix(EXTERNAL_RESOURCE_AMOUNT_SUFFIX))
+                       .longType()
+                       .noDefaultValue()
+                       .withDescription("The amount for the external resource 
specified by <resource_name> per TaskExecutor.");
+
+       /**
+        * If configured, Flink will add this key to the resource profile of 
container request to Yarn. The value will be
+        * set to {@link ExternalResourceOptions#EXTERNAL_RESOURCE_AMOUNT}.
+        *
+        * <p>It is intentionally included into user docs while unused.
+        */
+       @SuppressWarnings("unused")
+       public static final ConfigOption<String> 
EXTERNAL_RESOURCE_YARN_CONFIG_KEY =
+               
key(genericKeyWithSuffix(EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX))
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("If configured, Flink will add this 
key to the resource profile of container request to Yarn. " +
+                               "The value will be set to the value of " + 
EXTERNAL_RESOURCE_AMOUNT.key() + ".");

Review comment:
       I think this option should go into the `flink-yarn` module.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/ResourceInformationReflector.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Looks up the methods related to 
org.apache.hadoop.yarn.api.records.Resource#setResourceInformation.
+ * Only supported in Hadoop 3.0+ or 2.10+.
+ */
+class ResourceInformationReflector {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ResourceInformationReflector.class);
+
+       static final ResourceInformationReflector INSTANCE = new 
ResourceInformationReflector();
+
+       /** Class used to set the extended resource. */
+       private static final String RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation";
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceSetResourceInformationMethod;
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceGetResourcesMethod;
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceInformationGetNameMethod;
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceInformationGetValueMethod;
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceInformationNewInstanceMethod;
+
+       private final boolean isYarnResourceTypesAvailable;
+
+       private ResourceInformationReflector() {
+               this(Resource.class.getName(), RESOURCE_INFO_CLASS);
+       }
+
+       @VisibleForTesting
+       ResourceInformationReflector(String resourceClassName, String 
resourceInfoClassName) {
+               Method resourceSetResourceInformationMethod = null;
+               Method resourceGetResourcesMethod = null;
+               Method resourceInformationGetNameMethod = null;
+               Method resourceInformationGetValueMethod = null;
+               Method resourceInformationNewInstanceMethod = null;
+               boolean isYarnResourceTypesAvailable = false;
+               try {
+                       final Class<?> resourceClass = 
Class.forName(resourceClassName);
+                       final Class<?> resourceInfoClass = 
Class.forName(resourceInfoClassName);
+                       resourceSetResourceInformationMethod = 
resourceClass.getMethod("setResourceInformation", String.class, 
resourceInfoClass);
+                       resourceGetResourcesMethod = 
resourceClass.getMethod("getResources");
+                       resourceInformationGetNameMethod = 
resourceInfoClass.getMethod("getName");
+                       resourceInformationGetValueMethod = 
resourceInfoClass.getMethod("getValue");
+                       resourceInformationNewInstanceMethod = 
resourceInfoClass.getMethod("newInstance", String.class, long.class);
+                       isYarnResourceTypesAvailable = true;
+               } catch (Exception e) {
+                       LOG.debug("The underlying Yarn does not support 
external resource.", e);
+               } finally {
+                       this.resourceSetResourceInformationMethod = 
resourceSetResourceInformationMethod;
+                       this.resourceGetResourcesMethod = 
resourceGetResourcesMethod;
+                       this.resourceInformationGetNameMethod = 
resourceInformationGetNameMethod;
+                       this.resourceInformationGetValueMethod = 
resourceInformationGetValueMethod;
+                       this.resourceInformationNewInstanceMethod = 
resourceInformationNewInstanceMethod;
+                       this.isYarnResourceTypesAvailable = 
isYarnResourceTypesAvailable;
+               }
+       }
+
+       /**
+        * Add the given resourceName and value to the {@link Resource}.
+        */
+       void setResourceInformation(Resource resource, String resourceName, 
long amount) {
+               setResourceInformationUnSafe(resource, resourceName, amount);
+       }
+
+       /**
+        * Same as {@link #setResourceInformation(Resource, String, long)} but
+        * allows to pass objects that are not of type {@link Resource}.
+        */
+       @VisibleForTesting
+       void setResourceInformationUnSafe(Object resource, String resourceName, 
long amount) {
+               if (!isYarnResourceTypesAvailable) {
+                       LOG.info("Will not request extended resource {} because 
the underlying YARN does not support it.", resourceName);

Review comment:
       ```suggestion
                        LOG.info("Will not request extended resource {} because 
the used YARN version does not support it.", resourceName);
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -117,4 +122,52 @@ private ExternalResourceUtils() {
 
                return externalResourceConfigs;
        }
+
+       /**
+        * Instantiate the {@link ExternalResourceDriver}s for all of enabled 
external resources. {@link ExternalResourceDriver}s
+        * are mapped by its resource name.
+        */
+       public static Map<String, ExternalResourceDriver> 
externalResourceDriversFromConfig(Configuration config, PluginManager 
pluginManager) {
+               final Set<String> resourceSet = getExternalResourceSet(config);
+               LOG.info("Enabled external resources: {}", resourceSet);
+
+               if (resourceSet.isEmpty()) {
+                       return Collections.emptyMap();
+               }
+
+               final Iterator<ExternalResourceDriverFactory> factoryIterator = 
pluginManager.load(ExternalResourceDriverFactory.class);
+               final Map<String, ExternalResourceDriverFactory> 
externalResourceFactories = new HashMap<>();
+               factoryIterator.forEachRemaining(
+                       externalResourceDriverFactory -> 
externalResourceFactories.put(externalResourceDriverFactory.getClass().getName(),
 externalResourceDriverFactory));
+
+               final Map<String, ExternalResourceDriver> 
externalResourceDrivers = new HashMap<>();
+               for (String resourceName: resourceSet) {
+                       final ConfigOption<String> driverClassOption =
+                               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, 
ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX))
+                                       .stringType()
+                                       .noDefaultValue();
+                       final String driverFactoryClassName = 
config.getString(driverClassOption);
+                       if 
(StringUtils.isNullOrWhitespaceOnly(driverFactoryClassName)) {
+                               LOG.warn("Could not found driver class name for 
{}. Please make sure {} is configured.",
+                                       resourceName, driverClassOption.key());
+                               continue;
+                       }
+
+                       ExternalResourceDriverFactory 
externalResourceDriverFactory = 
externalResourceFactories.get(driverFactoryClassName);
+                       if (externalResourceDriverFactory != null) {
+                               DelegatingConfiguration delegatingConfiguration 
=
+                                       new DelegatingConfiguration(config, 
ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, 
ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_PARAM_SUFFIX));
+                               try {
+                                       
externalResourceDrivers.put(resourceName, 
externalResourceDriverFactory.createExternalResourceDriver(delegatingConfiguration));
+                                       LOG.info("Add external resources driver 
for {}.", resourceName);
+                               } catch (Exception e) {
+                                       LOG.error("Could not instantiate driver 
with factory {} for {}. {}", driverFactoryClassName, resourceName, e);

Review comment:
       ```suggestion
                                        LOG.warn("Could not instantiate driver 
with factory {} for {}.", driverFactoryClassName, resourceName, e);
   ```

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/ResourceInformationReflector.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Looks up the methods related to 
org.apache.hadoop.yarn.api.records.Resource#setResourceInformation.
+ * Only supported in Hadoop 3.0+ or 2.10+.
+ */
+class ResourceInformationReflector {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ResourceInformationReflector.class);
+
+       static final ResourceInformationReflector INSTANCE = new 
ResourceInformationReflector();
+
+       /** Class used to set the extended resource. */
+       private static final String RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation";
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceSetResourceInformationMethod;
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceGetResourcesMethod;
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceInformationGetNameMethod;
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceInformationGetValueMethod;
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceInformationNewInstanceMethod;
+
+       private final boolean isYarnResourceTypesAvailable;
+
+       private ResourceInformationReflector() {
+               this(Resource.class.getName(), RESOURCE_INFO_CLASS);
+       }
+
+       @VisibleForTesting
+       ResourceInformationReflector(String resourceClassName, String 
resourceInfoClassName) {
+               Method resourceSetResourceInformationMethod = null;
+               Method resourceGetResourcesMethod = null;
+               Method resourceInformationGetNameMethod = null;
+               Method resourceInformationGetValueMethod = null;
+               Method resourceInformationNewInstanceMethod = null;
+               boolean isYarnResourceTypesAvailable = false;
+               try {
+                       final Class<?> resourceClass = 
Class.forName(resourceClassName);
+                       final Class<?> resourceInfoClass = 
Class.forName(resourceInfoClassName);
+                       resourceSetResourceInformationMethod = 
resourceClass.getMethod("setResourceInformation", String.class, 
resourceInfoClass);
+                       resourceGetResourcesMethod = 
resourceClass.getMethod("getResources");
+                       resourceInformationGetNameMethod = 
resourceInfoClass.getMethod("getName");
+                       resourceInformationGetValueMethod = 
resourceInfoClass.getMethod("getValue");
+                       resourceInformationNewInstanceMethod = 
resourceInfoClass.getMethod("newInstance", String.class, long.class);
+                       isYarnResourceTypesAvailable = true;
+               } catch (Exception e) {
+                       LOG.debug("The underlying Yarn does not support 
external resource.", e);

Review comment:
       ```suggestion
                        LOG.debug("The underlying Yarn version does not support 
external resources.", e);
   ```

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/ResourceInformationReflector.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Looks up the methods related to 
org.apache.hadoop.yarn.api.records.Resource#setResourceInformation.
+ * Only supported in Hadoop 3.0+ or 2.10+.
+ */
+class ResourceInformationReflector {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ResourceInformationReflector.class);
+
+       static final ResourceInformationReflector INSTANCE = new 
ResourceInformationReflector();
+
+       /** Class used to set the extended resource. */
+       private static final String RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation";
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceSetResourceInformationMethod;
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceGetResourcesMethod;
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceInformationGetNameMethod;
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceInformationGetValueMethod;
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceInformationNewInstanceMethod;
+
+       private final boolean isYarnResourceTypesAvailable;
+
+       private ResourceInformationReflector() {
+               this(Resource.class.getName(), RESOURCE_INFO_CLASS);
+       }
+
+       @VisibleForTesting
+       ResourceInformationReflector(String resourceClassName, String 
resourceInfoClassName) {
+               Method resourceSetResourceInformationMethod = null;
+               Method resourceGetResourcesMethod = null;
+               Method resourceInformationGetNameMethod = null;
+               Method resourceInformationGetValueMethod = null;
+               Method resourceInformationNewInstanceMethod = null;
+               boolean isYarnResourceTypesAvailable = false;
+               try {
+                       final Class<?> resourceClass = 
Class.forName(resourceClassName);
+                       final Class<?> resourceInfoClass = 
Class.forName(resourceInfoClassName);
+                       resourceSetResourceInformationMethod = 
resourceClass.getMethod("setResourceInformation", String.class, 
resourceInfoClass);
+                       resourceGetResourcesMethod = 
resourceClass.getMethod("getResources");
+                       resourceInformationGetNameMethod = 
resourceInfoClass.getMethod("getName");
+                       resourceInformationGetValueMethod = 
resourceInfoClass.getMethod("getValue");
+                       resourceInformationNewInstanceMethod = 
resourceInfoClass.getMethod("newInstance", String.class, long.class);
+                       isYarnResourceTypesAvailable = true;
+               } catch (Exception e) {
+                       LOG.debug("The underlying Yarn does not support 
external resource.", e);
+               } finally {
+                       this.resourceSetResourceInformationMethod = 
resourceSetResourceInformationMethod;
+                       this.resourceGetResourcesMethod = 
resourceGetResourcesMethod;
+                       this.resourceInformationGetNameMethod = 
resourceInformationGetNameMethod;
+                       this.resourceInformationGetValueMethod = 
resourceInformationGetValueMethod;
+                       this.resourceInformationNewInstanceMethod = 
resourceInformationNewInstanceMethod;
+                       this.isYarnResourceTypesAvailable = 
isYarnResourceTypesAvailable;
+               }
+       }
+
+       /**
+        * Add the given resourceName and value to the {@link Resource}.
+        */
+       void setResourceInformation(Resource resource, String resourceName, 
long amount) {
+               setResourceInformationUnSafe(resource, resourceName, amount);
+       }
+
+       /**
+        * Same as {@link #setResourceInformation(Resource, String, long)} but
+        * allows to pass objects that are not of type {@link Resource}.
+        */
+       @VisibleForTesting
+       void setResourceInformationUnSafe(Object resource, String resourceName, 
long amount) {
+               if (!isYarnResourceTypesAvailable) {
+                       LOG.info("Will not request extended resource {} because 
the underlying YARN does not support it.", resourceName);
+                       return;
+               }
+               try {
+                       resourceSetResourceInformationMethod.invoke(
+                               resource,
+                               resourceName,
+                               
resourceInformationNewInstanceMethod.invoke(null, resourceName, amount));
+               } catch (Exception e) {
+                       LOG.error("Error in setting the external resource {}, 
will not request it to YARN. {}", resourceName, e);
+               }
+       }
+
+       /**
+        * Get the name and value of external resources from the {@link 
Resource}.
+        */
+       Map<String, Long> getExternalResources(Resource resource) {
+               return getExternalResourcesUnSafe(resource);
+       }
+
+       /**
+        * Same as {@link #getExternalResources(Resource)} but
+        * allows to pass objects that are not of type {@link Resource}.
+        */
+       @VisibleForTesting
+       Map<String, Long> getExternalResourcesUnSafe(Object resource) {
+               if (!isYarnResourceTypesAvailable) {
+                       return Collections.emptyMap();
+               }
+
+               final Map<String, Long> externalResources = new HashMap<>();
+               final Object[] externalResourcesInfo;
+               try {
+                       externalResourcesInfo = (Object[]) 
resourceGetResourcesMethod.invoke(resource);
+                       // The first two element would be cpu and mem.
+                       for (int i = 2; i < externalResourcesInfo.length; i++) {
+                               final String name = (String) 
resourceInformationGetNameMethod.invoke(externalResourcesInfo[i]);
+                               final long value = (long) 
resourceInformationGetValueMethod.invoke(externalResourcesInfo[i]);
+                               externalResources.put(name, value);
+                       }
+               } catch (Exception e) {
+                       LOG.error("Error in getting the external resource.", e);

Review comment:
       ```suggestion
                        LOG.warn("Could not obtain the external resources 
supported by the given Resource.", e);
   ```

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
##########
@@ -145,6 +150,12 @@ private boolean resourceWithinMaxAllocation(final 
InternalContainerResource reso
                return resource.memory <= maxMemMB && resource.vcores <= 
maxVcore;
        }
 
+       private static void setExternalResourceRequestIfPossible(Map<String, 
Long> externalResources, Resource resource) {

Review comment:
       Maybe a bit shorter:
   ```suggestion
        private static void trySetExternalResources(Map<String, Long> 
externalResources, Resource resource) {
   ```

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/ResourceInformationReflector.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Looks up the methods related to 
org.apache.hadoop.yarn.api.records.Resource#setResourceInformation.
+ * Only supported in Hadoop 3.0+ or 2.10+.
+ */
+class ResourceInformationReflector {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ResourceInformationReflector.class);
+
+       static final ResourceInformationReflector INSTANCE = new 
ResourceInformationReflector();
+
+       /** Class used to set the extended resource. */
+       private static final String RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation";
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceSetResourceInformationMethod;
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceGetResourcesMethod;
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceInformationGetNameMethod;
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceInformationGetValueMethod;
+
+       /** Could be Null iff isYarnResourceTypesAvailable is false. */
+       @Nullable
+       private final Method resourceInformationNewInstanceMethod;
+
+       private final boolean isYarnResourceTypesAvailable;
+
+       private ResourceInformationReflector() {
+               this(Resource.class.getName(), RESOURCE_INFO_CLASS);
+       }
+
+       @VisibleForTesting
+       ResourceInformationReflector(String resourceClassName, String 
resourceInfoClassName) {
+               Method resourceSetResourceInformationMethod = null;
+               Method resourceGetResourcesMethod = null;
+               Method resourceInformationGetNameMethod = null;
+               Method resourceInformationGetValueMethod = null;
+               Method resourceInformationNewInstanceMethod = null;
+               boolean isYarnResourceTypesAvailable = false;
+               try {
+                       final Class<?> resourceClass = 
Class.forName(resourceClassName);
+                       final Class<?> resourceInfoClass = 
Class.forName(resourceInfoClassName);
+                       resourceSetResourceInformationMethod = 
resourceClass.getMethod("setResourceInformation", String.class, 
resourceInfoClass);
+                       resourceGetResourcesMethod = 
resourceClass.getMethod("getResources");
+                       resourceInformationGetNameMethod = 
resourceInfoClass.getMethod("getName");
+                       resourceInformationGetValueMethod = 
resourceInfoClass.getMethod("getValue");
+                       resourceInformationNewInstanceMethod = 
resourceInfoClass.getMethod("newInstance", String.class, long.class);
+                       isYarnResourceTypesAvailable = true;
+               } catch (Exception e) {
+                       LOG.debug("The underlying Yarn does not support 
external resource.", e);
+               } finally {
+                       this.resourceSetResourceInformationMethod = 
resourceSetResourceInformationMethod;
+                       this.resourceGetResourcesMethod = 
resourceGetResourcesMethod;
+                       this.resourceInformationGetNameMethod = 
resourceInformationGetNameMethod;
+                       this.resourceInformationGetValueMethod = 
resourceInformationGetValueMethod;
+                       this.resourceInformationNewInstanceMethod = 
resourceInformationNewInstanceMethod;
+                       this.isYarnResourceTypesAvailable = 
isYarnResourceTypesAvailable;
+               }
+       }
+
+       /**
+        * Add the given resourceName and value to the {@link Resource}.
+        */
+       void setResourceInformation(Resource resource, String resourceName, 
long amount) {
+               setResourceInformationUnSafe(resource, resourceName, amount);
+       }
+
+       /**
+        * Same as {@link #setResourceInformation(Resource, String, long)} but
+        * allows to pass objects that are not of type {@link Resource}.
+        */
+       @VisibleForTesting
+       void setResourceInformationUnSafe(Object resource, String resourceName, 
long amount) {
+               if (!isYarnResourceTypesAvailable) {
+                       LOG.info("Will not request extended resource {} because 
the underlying YARN does not support it.", resourceName);
+                       return;
+               }
+               try {
+                       resourceSetResourceInformationMethod.invoke(
+                               resource,
+                               resourceName,
+                               
resourceInformationNewInstanceMethod.invoke(null, resourceName, amount));
+               } catch (Exception e) {
+                       LOG.error("Error in setting the external resource {}, 
will not request it to YARN. {}", resourceName, e);
+               }
+       }
+
+       /**
+        * Get the name and value of external resources from the {@link 
Resource}.
+        */
+       Map<String, Long> getExternalResources(Resource resource) {
+               return getExternalResourcesUnSafe(resource);
+       }
+
+       /**
+        * Same as {@link #getExternalResources(Resource)} but
+        * allows to pass objects that are not of type {@link Resource}.
+        */
+       @VisibleForTesting
+       Map<String, Long> getExternalResourcesUnSafe(Object resource) {
+               if (!isYarnResourceTypesAvailable) {
+                       return Collections.emptyMap();
+               }
+
+               final Map<String, Long> externalResources = new HashMap<>();
+               final Object[] externalResourcesInfo;
+               try {
+                       externalResourcesInfo = (Object[]) 
resourceGetResourcesMethod.invoke(resource);
+                       // The first two element would be cpu and mem.
+                       for (int i = 2; i < externalResourcesInfo.length; i++) {
+                               final String name = (String) 
resourceInformationGetNameMethod.invoke(externalResourcesInfo[i]);
+                               final long value = (long) 
resourceInformationGetValueMethod.invoke(externalResourcesInfo[i]);
+                               externalResources.put(name, value);
+                       }

Review comment:
       Do we know whether this is a stable contract?

##########
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/ResourceInformationReflectorTest.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests for {@link ResourceInformationReflector}.
+ */
+public class ResourceInformationReflectorTest extends TestLogger {
+
+       private static final String RESOURCE_NAME = "test";
+       private static final long RESOURCE_VALUE = 1;
+
+       @Test
+       public void testSetResourceInformationIfMethodPresent() {
+               final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+               final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+               
resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+               
assertNotNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+               
assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getName(), 
is(RESOURCE_NAME));
+               
assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getValue(), 
is(RESOURCE_VALUE));
+       }
+
+       @Test
+       public void testGetResourceInformationIfMethodPresent() {
+               final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+               final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+               resourceWithMethod.setResourceInformation(RESOURCE_NAME, 
ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+               final Map<String, Long> externalResources = 
resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+               assertThat(externalResources.size(), is(1));
+               assertTrue(externalResources.containsKey(RESOURCE_NAME));
+               assertThat(externalResources.get(RESOURCE_NAME), 
is(RESOURCE_VALUE));
+       }
+
+       @Test
+       public void testSetResourceInformationIfMethodAbsent() {
+               final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+               final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+               
resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+               
assertNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+       }
+
+       @Test
+       public void testGetResourceInformationIfMethodAbsent() {
+               final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+               final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+               resourceWithMethod.setResourceInformation(RESOURCE_NAME, 
ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+               final Map<String, Long> externalResources = 
resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+               assertThat(externalResources.size(), is(0));
+       }
+
+       @Test
+       public void testSetAndGetExtendedResourcesWithYarnSupport() {
+               assumeTrue(HadoopUtils.isMinHadoopVersion(2, 10));
+
+               final Resource resource = Resource.newInstance(100, 1);
+
+               
ResourceInformationReflector.INSTANCE.setResourceInformation(resource, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+               final Map<String, Long> externalResourcesResult = 
ResourceInformationReflector.INSTANCE.getExternalResources(resource);
+               assertTrue(externalResourcesResult.containsKey(RESOURCE_NAME));
+               assertThat(externalResourcesResult.get(RESOURCE_NAME), 
is(RESOURCE_VALUE));
+       }
+
+       @Test
+       public void testSetAndGetExtendedResourcesWithoutYarnSupport() {
+               assumeTrue(HadoopUtils.isMaxHadoopVersion(3, 0));
+
+               final Resource resource = Resource.newInstance(100, 1);
+
+               // Should do nothing without leading to failure.
+               
ResourceInformationReflector.INSTANCE.setResourceInformation(resource, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+               final Map<String, Long> externalResourcesResult = 
ResourceInformationReflector.INSTANCE.getExternalResources(resource);
+               assertTrue(externalResourcesResult.isEmpty());

Review comment:
       Why would this test pass if the Hadoop version is either `2.10+` or 
`3.0`? It seems to contradict `testSetAndGetExtendedResourcesWithYarnSupport`.

##########
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/ResourceInformationReflectorTest.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests for {@link ResourceInformationReflector}.
+ */
+public class ResourceInformationReflectorTest extends TestLogger {
+
+       private static final String RESOURCE_NAME = "test";
+       private static final long RESOURCE_VALUE = 1;
+
+       @Test
+       public void testSetResourceInformationIfMethodPresent() {
+               final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+               final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+               
resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+               
assertNotNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+               
assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getName(), 
is(RESOURCE_NAME));
+               
assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getValue(), 
is(RESOURCE_VALUE));
+       }
+
+       @Test
+       public void testGetResourceInformationIfMethodPresent() {
+               final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+               final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+               resourceWithMethod.setResourceInformation(RESOURCE_NAME, 
ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+               final Map<String, Long> externalResources = 
resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+               assertThat(externalResources.size(), is(1));
+               assertTrue(externalResources.containsKey(RESOURCE_NAME));
+               assertThat(externalResources.get(RESOURCE_NAME), 
is(RESOURCE_VALUE));
+       }
+
+       @Test
+       public void testSetResourceInformationIfMethodAbsent() {
+               final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+               final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+               
resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+               
assertNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+       }
+
+       @Test
+       public void testGetResourceInformationIfMethodAbsent() {
+               final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+               final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+               resourceWithMethod.setResourceInformation(RESOURCE_NAME, 
ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+               final Map<String, Long> externalResources = 
resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+               assertThat(externalResources.size(), is(0));

Review comment:
       Maybe
   ```suggestion
                assertThat(externalResources, empty());
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -117,4 +122,52 @@ private ExternalResourceUtils() {
 
                return externalResourceConfigs;
        }
+
+       /**
+        * Instantiate the {@link ExternalResourceDriver}s for all of enabled 
external resources. {@link ExternalResourceDriver}s
+        * are mapped by its resource name.
+        */
+       public static Map<String, ExternalResourceDriver> 
externalResourceDriversFromConfig(Configuration config, PluginManager 
pluginManager) {
+               final Set<String> resourceSet = getExternalResourceSet(config);
+               LOG.info("Enabled external resources: {}", resourceSet);
+
+               if (resourceSet.isEmpty()) {
+                       return Collections.emptyMap();
+               }
+
+               final Iterator<ExternalResourceDriverFactory> factoryIterator = 
pluginManager.load(ExternalResourceDriverFactory.class);
+               final Map<String, ExternalResourceDriverFactory> 
externalResourceFactories = new HashMap<>();
+               factoryIterator.forEachRemaining(
+                       externalResourceDriverFactory -> 
externalResourceFactories.put(externalResourceDriverFactory.getClass().getName(),
 externalResourceDriverFactory));
+
+               final Map<String, ExternalResourceDriver> 
externalResourceDrivers = new HashMap<>();
+               for (String resourceName: resourceSet) {
+                       final ConfigOption<String> driverClassOption =
+                               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, 
ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX))
+                                       .stringType()
+                                       .noDefaultValue();
+                       final String driverFactoryClassName = 
config.getString(driverClassOption);
+                       if 
(StringUtils.isNullOrWhitespaceOnly(driverFactoryClassName)) {
+                               LOG.warn("Could not found driver class name for 
{}. Please make sure {} is configured.",
+                                       resourceName, driverClassOption.key());
+                               continue;
+                       }
+
+                       ExternalResourceDriverFactory 
externalResourceDriverFactory = 
externalResourceFactories.get(driverFactoryClassName);
+                       if (externalResourceDriverFactory != null) {
+                               DelegatingConfiguration delegatingConfiguration 
=
+                                       new DelegatingConfiguration(config, 
ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, 
ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_PARAM_SUFFIX));
+                               try {
+                                       
externalResourceDrivers.put(resourceName, 
externalResourceDriverFactory.createExternalResourceDriver(delegatingConfiguration));
+                                       LOG.info("Add external resources driver 
for {}.", resourceName);
+                               } catch (Exception e) {
+                                       LOG.error("Could not instantiate driver 
with factory {} for {}. {}", driverFactoryClassName, resourceName, e);
+                               }
+                       } else {
+                               LOG.error("Could not find factory class {} for 
{}.", driverFactoryClassName, resourceName);

Review comment:
       ```suggestion
                                LOG.warn("Could not find factory class {} for 
{}.", driverFactoryClassName, resourceName);
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -117,4 +122,52 @@ private ExternalResourceUtils() {
 
                return externalResourceConfigs;
        }
+
+       /**
+        * Instantiate the {@link ExternalResourceDriver}s for all of enabled 
external resources. {@link ExternalResourceDriver}s

Review comment:
       ```suggestion
         * Instantiate the {@link ExternalResourceDriver 
ExternalResourceDrivers} for all of enabled external resources. {@link 
ExternalResourceDriver ExternalResourceDrivers}
   ```

##########
File path: 
flink-core/src/main/java/org/apache/flink/core/plugin/PluginManager.java
##########
@@ -45,6 +46,13 @@
        /** List of patterns for classes that should always be resolved from 
the parent ClassLoader. */
        private final String[] alwaysParentFirstPatterns;
 
+       @VisibleForTesting
+       PluginManager() {
+               parentClassLoader = null;
+               pluginDescriptors = null;
+               alwaysParentFirstPatterns = null;
+       }

Review comment:
       I think the clean solution for providing a `TestingPluginManager` 
implementation would be to extract an interface from the `PluginManager` and 
then having multiple implementations for it.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
##########
@@ -157,6 +158,13 @@
 
        GlobalAggregateManager getGlobalAggregateManager();
 
+       /**
+        * Get the enabled external resource drivers for external resources.
+        *
+        * @return the enabled external resource drivers for external resources
+        */
+       Map<String, ExternalResourceDriver> getExternalResourceDrivers();

Review comment:
       I would prefer to not expose the `ExternalResourceDriver` to all 
`Environment` users if possible. Wouldn't it be enough to expose 
`Set<ExternalResourceInfo> getExternalResourceInfo(String resourceName)`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
##########
@@ -157,6 +158,13 @@
 
        GlobalAggregateManager getGlobalAggregateManager();
 
+       /**
+        * Get the enabled external resource drivers for external resources.
+        *
+        * @return the enabled external resource drivers for external resources

Review comment:
       maybe
   
   ```suggestion
         * @return a map of the enabled external resource drivers for external 
resources and their names
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
##########
@@ -155,4 +163,65 @@ public void 
testGetExternalResourcesWithMultipleExternalResource() {
                assertThat(configMap.get(RESOURCE_CONFIG_KEY_1), 
is(RESOURCE_AMOUNT_1));
                assertThat(configMap.get(RESOURCE_CONFIG_KEY_2), 
is(RESOURCE_AMOUNT_2));
        }
+
+       @Test
+       public void testConstructExternalResourceDriversFromConfig() {
+               final Configuration config = new Configuration();
+               final String driverFactoryClassName = 
TestingExternalResourceDriverFactory.class.getName();
+               final Map<Class<?>, Iterator<?>> plugins = new HashMap<>();
+               plugins.put(ExternalResourceDriverFactory.class, 
IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory()));
+               final PluginManager testingPluginManagerPluginManager = new 
TestingPluginManager(plugins);
+
+               config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, 
Collections.singletonList(RESOURCE_NAME_1));
+               
config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1,
 ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX), 
driverFactoryClassName);
+
+               final Map<String, ExternalResourceDriver> 
externalResourceDrivers = 
ExternalResourceUtils.externalResourceDriversFromConfig(config, 
testingPluginManagerPluginManager);
+
+               assertThat(externalResourceDrivers.size(), is(1));
+               assertTrue(externalResourceDrivers.get(RESOURCE_NAME_1) 
instanceof TestingExternalResourceDriver);

Review comment:
       nit:
   ```suggestion
                assertThat(externalResourceDrivers.get(RESOURCE_NAME_1), 
instanceOf(TestingExternalResourceDriver.class));
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -170,4 +201,55 @@ private ExternalResourceUtils() {
 
                return externalResourceDrivers;
        }
+
+       /**
+        * Get the external resource information from drivers. Index by the 
resourceName.

Review comment:
       ```suggestion
         * Get the external resource information from drivers. Indexed by the 
resource name.
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
##########
@@ -183,8 +186,9 @@ protected void closeLocalStrategiesAndCaches() {
        @Override
        public DistributedRuntimeUDFContext createRuntimeContext(MetricGroup 
metrics) {
                Environment env = getEnvironment();
+
                return new IterativeRuntimeUdfContext(env.getTaskInfo(), 
getUserCodeClassLoader(),
-                               getExecutionConfig(), 
env.getDistributedCacheEntries(), this.accumulatorMap, metrics);
+                               getExecutionConfig(), 
env.getDistributedCacheEntries(), this.accumulatorMap, metrics, 
ExternalResourceUtils.getExternalResourceInfo(env.getExternalResourceDrivers(), 
env.getTaskManagerInfo().getConfiguration()));

Review comment:
       How expensive do we expect calls to the `ExternalResourceDrivers` to be? 
I'm just wondering because here we retrieve the `ExternalResourceInfos` for 
every task, independent of whether they need it or not. I think I would prefer 
that this feature would not add additional costs for tasks which don't want to 
use it.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -170,4 +201,55 @@ private ExternalResourceUtils() {
 
                return externalResourceDrivers;
        }
+
+       /**
+        * Get the external resource information from drivers. Index by the 
resourceName.
+        */
+       public static Map<String, Set<? extends ExternalResourceInfo>> 
getExternalResourceInfo(Map<String, ExternalResourceDriver> 
externalResourceDrivers, Configuration configuration) {
+               return 
SharedExternalResources.INSTANCE.getSharedExternalResourceInfo(externalResourceDrivers,
 configuration);
+       }
+
+       private static Map<String, Set<? extends ExternalResourceInfo>> 
getExternalResourceInfoInternal(Map<String, ExternalResourceDriver> 
externalResourceDrivers, Configuration configuration) {

Review comment:
       ```suggestion
        private static Map<String, Set<? extends ExternalResourceInfo>> 
getExternalResourceInfosInternal(Map<String, ExternalResourceDriver> 
externalResourceDrivers, Configuration configuration) {
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
##########
@@ -155,4 +163,65 @@ public void 
testGetExternalResourcesWithMultipleExternalResource() {
                assertThat(configMap.get(RESOURCE_CONFIG_KEY_1), 
is(RESOURCE_AMOUNT_1));
                assertThat(configMap.get(RESOURCE_CONFIG_KEY_2), 
is(RESOURCE_AMOUNT_2));
        }
+
+       @Test
+       public void testConstructExternalResourceDriversFromConfig() {
+               final Configuration config = new Configuration();
+               final String driverFactoryClassName = 
TestingExternalResourceDriverFactory.class.getName();
+               final Map<Class<?>, Iterator<?>> plugins = new HashMap<>();
+               plugins.put(ExternalResourceDriverFactory.class, 
IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory()));
+               final PluginManager testingPluginManagerPluginManager = new 
TestingPluginManager(plugins);
+
+               config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, 
Collections.singletonList(RESOURCE_NAME_1));
+               
config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1,
 ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX), 
driverFactoryClassName);
+
+               final Map<String, ExternalResourceDriver> 
externalResourceDrivers = 
ExternalResourceUtils.externalResourceDriversFromConfig(config, 
testingPluginManagerPluginManager);
+
+               assertThat(externalResourceDrivers.size(), is(1));
+               assertTrue(externalResourceDrivers.get(RESOURCE_NAME_1) 
instanceof TestingExternalResourceDriver);
+       }
+
+       @Test
+       public void testNotConfigureFactoryClass() {
+               final Configuration config = new Configuration();
+               final Map<Class<?>, Iterator<?>> plugins = new HashMap<>();
+               plugins.put(ExternalResourceDriverFactory.class, 
IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory()));
+               final PluginManager testingPluginManagerPluginManager = new 
TestingPluginManager(plugins);
+
+               config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, 
Collections.singletonList(RESOURCE_NAME_1));
+
+               final Map<String, ExternalResourceDriver> 
externalResourceDrivers = 
ExternalResourceUtils.externalResourceDriversFromConfig(config, 
testingPluginManagerPluginManager);
+
+               assertThat(externalResourceDrivers.size(), is(0));

Review comment:
       ```suggestion
                assertThat(externalResourceDrivers, empty());
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
##########
@@ -155,4 +163,65 @@ public void 
testGetExternalResourcesWithMultipleExternalResource() {
                assertThat(configMap.get(RESOURCE_CONFIG_KEY_1), 
is(RESOURCE_AMOUNT_1));
                assertThat(configMap.get(RESOURCE_CONFIG_KEY_2), 
is(RESOURCE_AMOUNT_2));
        }
+
+       @Test
+       public void testConstructExternalResourceDriversFromConfig() {
+               final Configuration config = new Configuration();
+               final String driverFactoryClassName = 
TestingExternalResourceDriverFactory.class.getName();
+               final Map<Class<?>, Iterator<?>> plugins = new HashMap<>();
+               plugins.put(ExternalResourceDriverFactory.class, 
IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory()));
+               final PluginManager testingPluginManagerPluginManager = new 
TestingPluginManager(plugins);
+
+               config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, 
Collections.singletonList(RESOURCE_NAME_1));
+               
config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1,
 ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX), 
driverFactoryClassName);
+
+               final Map<String, ExternalResourceDriver> 
externalResourceDrivers = 
ExternalResourceUtils.externalResourceDriversFromConfig(config, 
testingPluginManagerPluginManager);
+
+               assertThat(externalResourceDrivers.size(), is(1));
+               assertTrue(externalResourceDrivers.get(RESOURCE_NAME_1) 
instanceof TestingExternalResourceDriver);
+       }
+
+       @Test
+       public void testNotConfigureFactoryClass() {
+               final Configuration config = new Configuration();
+               final Map<Class<?>, Iterator<?>> plugins = new HashMap<>();
+               plugins.put(ExternalResourceDriverFactory.class, 
IteratorUtils.singletonIterator(new TestingExternalResourceDriverFactory()));
+               final PluginManager testingPluginManagerPluginManager = new 
TestingPluginManager(plugins);
+
+               config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, 
Collections.singletonList(RESOURCE_NAME_1));
+
+               final Map<String, ExternalResourceDriver> 
externalResourceDrivers = 
ExternalResourceUtils.externalResourceDriversFromConfig(config, 
testingPluginManagerPluginManager);
+
+               assertThat(externalResourceDrivers.size(), is(0));
+       }
+
+       @Test
+       public void testFactoryPluginNotExist() {
+               final Configuration config = new Configuration();
+               final String driverFactoryClassName = 
TestingExternalResourceDriverFactory.class.getName();
+               final PluginManager testingPluginManagerPluginManager = new 
TestingPluginManager(Collections.emptyMap());
+
+               config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, 
Collections.singletonList(RESOURCE_NAME_1));
+               
config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1,
 ExternalResourceOptions.EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX), 
driverFactoryClassName);
+
+               final Map<String, ExternalResourceDriver> 
externalResourceDrivers = 
ExternalResourceUtils.externalResourceDriversFromConfig(config, 
testingPluginManagerPluginManager);
+
+               assertThat(externalResourceDrivers.size(), is(0));

Review comment:
       ```suggestion
                assertThat(externalResourceDrivers, empty());
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
##########
@@ -157,6 +158,13 @@
 
        GlobalAggregateManager getGlobalAggregateManager();
 
+       /**
+        * Get the enabled external resource drivers for external resources.
+        *
+        * @return the enabled external resource drivers for external resources
+        */
+       Map<String, ExternalResourceDriver> getExternalResourceDrivers();

Review comment:
       Or maybe the `Environment` allows you to get an 
`ExternalResourceInfoProvider/Lookup`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to