xintongsong commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r417157271



##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.List;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration options for external resources and external resource drivers.
+ */
+@PublicEvolving
+public class ExternalResourceOptions {
+
+       /** The amount of the external resource per task executor. This is used 
as a suffix in an actual config. */
+       public static final String EXTERNAL_RESOURCE_AMOUNT_SUFFIX = "amount";
+
+       /** The driver factory class of the external resource to use. This is 
used as a suffix in an actual config. */
+       public static final String EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX = 
"driver-factory.class";
+
+       /** Defines the configuration key of that external resource in Yarn. 
This is used as a suffix in an actual config. */
+       public static final String EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX = 
"yarn.config-key";
+
+       /** Defines the configuration key of that external resource in 
Kubernetes. This is used as a suffix in an actual config. */
+       public static final String 
EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX = "kubernetes.config-key";
+
+       /**
+        * The prefix for all external resources configs. Has to be combined 
with a resource name and
+        * the configs mentioned below.
+        */
+       private static final String EXTERNAL_RESOURCE_PREFIX = 
"external-resource";
+
+       /** User-defined params of that external resource. This is used as a 
suffix in an actual config. */
+       private static final String EXTERNAL_RESOURCE_PARAMETER_SUFFIX = 
"param.<params>";
+
+       /**
+        * List of the resource_name of all external resources with delimiter 
";". The resource_name will be used to
+        * splice related config options for external resource. Only the 
resource_name defined here will go into effect in external
+        * resource framework.
+        */
+       public static final ConfigOption<List<String>> EXTERNAL_RESOURCE_LIST =
+               key("external-resources")
+                       .stringType()
+                       .asList()
+                       .noDefaultValue()
+                       .withDescription("List of the <resource_name> of all 
external resources with delimiter \";\". " +
+                               "The <resource_name> will be used to splice 
related config options for external resource. Only the " +
+                               "<resource_name> defined here will go into 
effect by external resource framework.");
+
+       /**
+        * Defines the factory class name for the external resource identified 
by &gt;resource_name&lt;. The factory will be used
+        * to instantiated the {@link 
org.apache.flink.api.common.externalresource.ExternalResourceDriver} at the 
TaskExecutor side.
+        *
+        * <p>It is intentionally included into user docs while unused.
+        */
+       @SuppressWarnings("unused")
+       public static final ConfigOption<String> 
EXTERNAL_RESOURCE_DRIVER_FACTORY_CLASS =
+               
key(genericKeyWithSuffix(EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX))
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("Defines the factory class name for 
the external resource identified by <resource_name>. The " +
+                               "factory will be used to instantiated the 
ExternalResourceDriver at the TaskExecutor side.");
+       /**
+        * The amount for the external resource specified by 
&gt;resource_name&lt; per TaskExecutor.
+        *
+        * <p>It is intentionally included into user docs while unused.
+        */
+       @SuppressWarnings("unused")
+       public static final ConfigOption<Long> EXTERNAL_RESOURCE_AMOUNT =
+               key(genericKeyWithSuffix(EXTERNAL_RESOURCE_AMOUNT_SUFFIX))
+                       .longType()
+                       .defaultValue(0L)
+                       .withDescription("The amount for the external resource 
specified by <resource_name> per TaskExecutor.");
+
+       /**
+        * If configured, Flink will add this key to the resource profile of 
container request to Yarn. The value will be
+        * set to {@link ExternalResourceOptions#EXTERNAL_RESOURCE_AMOUNT}.
+        *
+        * <p>It is intentionally included into user docs while unused.
+        */
+       @SuppressWarnings("unused")
+       public static final ConfigOption<String> 
EXTERNAL_RESOURCE_YARN_CONFIG_KEY =
+               
key(genericKeyWithSuffix(EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX))
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("If configured, Flink will add this 
key to the resource profile of container request to Yarn. " +
+                               "The value will be set to " + 
EXTERNAL_RESOURCE_AMOUNT.key() + ".");
+
+       /**
+        * If configured, Flink will add "resources.limits.&gt;config-key&lt;" 
and "resources.requests.&gt;config-key&lt;" to the main
+        * container of TaskExecutor and set the value to {@link 
ExternalResourceOptions#EXTERNAL_RESOURCE_AMOUNT}.
+        *
+        * <p>It is intentionally included into user docs while unused.
+        */
+       @SuppressWarnings("unused")
+       public static final ConfigOption<String> 
EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY =
+               
key(genericKeyWithSuffix(EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX))
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("If configured, Flink will add 
\"resources.limits.<config-key>\" and \"resources.requests.<config-key>\" " +
+                               "to the main container of TaskExecutor and set 
the value to " + EXTERNAL_RESOURCE_AMOUNT.key() + ".");
+
+       /**
+        * User-defined &gt;parameter&lt;s for the external resource identified 
by &gt;resource_name&lt;.
+        *
+        * <p>It is intentionally included into user docs while unused.
+        */
+       @SuppressWarnings("unused")
+       public static final ConfigOption<String> EXTERNAL_RESOURCE_PARAMETER =
+               key(genericKeyWithSuffix(EXTERNAL_RESOURCE_PARAMETER_SUFFIX))
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("User-defined <parameter>s for the 
external resource identified by <resource_name>.");
+
+       private static String genericKeyWithSuffix(String suffix) {
+               return keyWithResourceNameAndSuffix("<resource_name>", suffix);
+       }
+
+       /**
+        * Generate the config key with resource_name and suffix.
+        */
+       public static String keyWithResourceNameAndSuffix(String resourceName, 
String suffix) {
+               return String.format("%s.%s.%s", EXTERNAL_RESOURCE_PREFIX, 
resourceName, suffix);

Review comment:
       Better to add `checkNotNull` for the arguments.

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.List;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration options for external resources and external resource drivers.
+ */
+@PublicEvolving
+public class ExternalResourceOptions {
+
+       /** The amount of the external resource per task executor. This is used 
as a suffix in an actual config. */
+       public static final String EXTERNAL_RESOURCE_AMOUNT_SUFFIX = "amount";
+
+       /** The driver factory class of the external resource to use. This is 
used as a suffix in an actual config. */
+       public static final String EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX = 
"driver-factory.class";
+
+       /** Defines the configuration key of that external resource in Yarn. 
This is used as a suffix in an actual config. */
+       public static final String EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX = 
"yarn.config-key";
+
+       /** Defines the configuration key of that external resource in 
Kubernetes. This is used as a suffix in an actual config. */
+       public static final String 
EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX = "kubernetes.config-key";
+
+       /**
+        * The prefix for all external resources configs. Has to be combined 
with a resource name and
+        * the configs mentioned below.
+        */
+       private static final String EXTERNAL_RESOURCE_PREFIX = 
"external-resource";
+
+       /** User-defined params of that external resource. This is used as a 
suffix in an actual config. */
+       private static final String EXTERNAL_RESOURCE_PARAMETER_SUFFIX = 
"param.<params>";
+
+       /**
+        * List of the resource_name of all external resources with delimiter 
";". The resource_name will be used to
+        * splice related config options for external resource. Only the 
resource_name defined here will go into effect in external
+        * resource framework.
+        */
+       public static final ConfigOption<List<String>> EXTERNAL_RESOURCE_LIST =
+               key("external-resources")
+                       .stringType()
+                       .asList()
+                       .noDefaultValue()
+                       .withDescription("List of the <resource_name> of all 
external resources with delimiter \";\". " +
+                               "The <resource_name> will be used to splice 
related config options for external resource. Only the " +
+                               "<resource_name> defined here will go into 
effect by external resource framework.");
+
+       /**
+        * Defines the factory class name for the external resource identified 
by &gt;resource_name&lt;. The factory will be used
+        * to instantiated the {@link 
org.apache.flink.api.common.externalresource.ExternalResourceDriver} at the 
TaskExecutor side.
+        *
+        * <p>It is intentionally included into user docs while unused.
+        */
+       @SuppressWarnings("unused")
+       public static final ConfigOption<String> 
EXTERNAL_RESOURCE_DRIVER_FACTORY_CLASS =
+               
key(genericKeyWithSuffix(EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX))
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("Defines the factory class name for 
the external resource identified by <resource_name>. The " +
+                               "factory will be used to instantiated the 
ExternalResourceDriver at the TaskExecutor side.");
+       /**
+        * The amount for the external resource specified by 
&gt;resource_name&lt; per TaskExecutor.
+        *
+        * <p>It is intentionally included into user docs while unused.
+        */
+       @SuppressWarnings("unused")
+       public static final ConfigOption<Long> EXTERNAL_RESOURCE_AMOUNT =
+               key(genericKeyWithSuffix(EXTERNAL_RESOURCE_AMOUNT_SUFFIX))
+                       .longType()
+                       .defaultValue(0L)
+                       .withDescription("The amount for the external resource 
specified by <resource_name> per TaskExecutor.");
+
+       /**
+        * If configured, Flink will add this key to the resource profile of 
container request to Yarn. The value will be
+        * set to {@link ExternalResourceOptions#EXTERNAL_RESOURCE_AMOUNT}.
+        *
+        * <p>It is intentionally included into user docs while unused.
+        */
+       @SuppressWarnings("unused")
+       public static final ConfigOption<String> 
EXTERNAL_RESOURCE_YARN_CONFIG_KEY =
+               
key(genericKeyWithSuffix(EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX))
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("If configured, Flink will add this 
key to the resource profile of container request to Yarn. " +
+                               "The value will be set to " + 
EXTERNAL_RESOURCE_AMOUNT.key() + ".");
+
+       /**
+        * If configured, Flink will add "resources.limits.&gt;config-key&lt;" 
and "resources.requests.&gt;config-key&lt;" to the main
+        * container of TaskExecutor and set the value to {@link 
ExternalResourceOptions#EXTERNAL_RESOURCE_AMOUNT}.
+        *
+        * <p>It is intentionally included into user docs while unused.
+        */
+       @SuppressWarnings("unused")
+       public static final ConfigOption<String> 
EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY =
+               
key(genericKeyWithSuffix(EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX))
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("If configured, Flink will add 
\"resources.limits.<config-key>\" and \"resources.requests.<config-key>\" " +
+                               "to the main container of TaskExecutor and set 
the value to " + EXTERNAL_RESOURCE_AMOUNT.key() + ".");
+
+       /**
+        * User-defined &gt;parameter&lt;s for the external resource identified 
by &gt;resource_name&lt;.
+        *
+        * <p>It is intentionally included into user docs while unused.
+        */
+       @SuppressWarnings("unused")
+       public static final ConfigOption<String> EXTERNAL_RESOURCE_PARAMETER =

Review comment:
       I'm wondering do we actually need this config option.
   It seems to me this is a suggested pattern for user defined configurations, 
not required. We have not filtered the configurations to make only the options 
following this pattern visible to the driver.
   Maybe just remove this option, and mention in the user document that we 
recommend this pattern for custom parameters to avoid conflict.

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.List;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration options for external resources and external resource drivers.
+ */
+@PublicEvolving
+public class ExternalResourceOptions {
+
+       /** The amount of the external resource per task executor. This is used 
as a suffix in an actual config. */
+       public static final String EXTERNAL_RESOURCE_AMOUNT_SUFFIX = "amount";
+
+       /** The driver factory class of the external resource to use. This is 
used as a suffix in an actual config. */
+       public static final String EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX = 
"driver-factory.class";
+
+       /** Defines the configuration key of that external resource in Yarn. 
This is used as a suffix in an actual config. */
+       public static final String EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX = 
"yarn.config-key";
+
+       /** Defines the configuration key of that external resource in 
Kubernetes. This is used as a suffix in an actual config. */
+       public static final String 
EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX = "kubernetes.config-key";
+
+       /**
+        * The prefix for all external resources configs. Has to be combined 
with a resource name and
+        * the configs mentioned below.
+        */
+       private static final String EXTERNAL_RESOURCE_PREFIX = 
"external-resource";
+
+       /** User-defined params of that external resource. This is used as a 
suffix in an actual config. */
+       private static final String EXTERNAL_RESOURCE_PARAMETER_SUFFIX = 
"param.<params>";
+
+       /**
+        * List of the resource_name of all external resources with delimiter 
";". The resource_name will be used to
+        * splice related config options for external resource. Only the 
resource_name defined here will go into effect in external
+        * resource framework.
+        */
+       public static final ConfigOption<List<String>> EXTERNAL_RESOURCE_LIST =
+               key("external-resources")
+                       .stringType()
+                       .asList()
+                       .noDefaultValue()
+                       .withDescription("List of the <resource_name> of all 
external resources with delimiter \";\". " +
+                               "The <resource_name> will be used to splice 
related config options for external resource. Only the " +
+                               "<resource_name> defined here will go into 
effect by external resource framework.");
+
+       /**
+        * Defines the factory class name for the external resource identified 
by &gt;resource_name&lt;. The factory will be used
+        * to instantiated the {@link 
org.apache.flink.api.common.externalresource.ExternalResourceDriver} at the 
TaskExecutor side.
+        *
+        * <p>It is intentionally included into user docs while unused.
+        */
+       @SuppressWarnings("unused")
+       public static final ConfigOption<String> 
EXTERNAL_RESOURCE_DRIVER_FACTORY_CLASS =
+               
key(genericKeyWithSuffix(EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX))
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("Defines the factory class name for 
the external resource identified by <resource_name>. The " +
+                               "factory will be used to instantiated the 
ExternalResourceDriver at the TaskExecutor side.");
+       /**
+        * The amount for the external resource specified by 
&gt;resource_name&lt; per TaskExecutor.
+        *
+        * <p>It is intentionally included into user docs while unused.
+        */
+       @SuppressWarnings("unused")
+       public static final ConfigOption<Long> EXTERNAL_RESOURCE_AMOUNT =
+               key(genericKeyWithSuffix(EXTERNAL_RESOURCE_AMOUNT_SUFFIX))
+                       .longType()
+                       .defaultValue(0L)
+                       .withDescription("The amount for the external resource 
specified by <resource_name> per TaskExecutor.");
+
+       /**
+        * If configured, Flink will add this key to the resource profile of 
container request to Yarn. The value will be
+        * set to {@link ExternalResourceOptions#EXTERNAL_RESOURCE_AMOUNT}.
+        *
+        * <p>It is intentionally included into user docs while unused.
+        */
+       @SuppressWarnings("unused")
+       public static final ConfigOption<String> 
EXTERNAL_RESOURCE_YARN_CONFIG_KEY =
+               
key(genericKeyWithSuffix(EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX))
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("If configured, Flink will add this 
key to the resource profile of container request to Yarn. " +
+                               "The value will be set to " + 
EXTERNAL_RESOURCE_AMOUNT.key() + ".");

Review comment:
       This is a bit ambiguous.
   ```suggestion
                                "The value will be set to the value of " + 
EXTERNAL_RESOURCE_AMOUNT.key() + ".");
   ```

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.List;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration options for external resources and external resource drivers.
+ */
+@PublicEvolving
+public class ExternalResourceOptions {
+
+       /** The amount of the external resource per task executor. This is used 
as a suffix in an actual config. */
+       public static final String EXTERNAL_RESOURCE_AMOUNT_SUFFIX = "amount";
+
+       /** The driver factory class of the external resource to use. This is 
used as a suffix in an actual config. */
+       public static final String EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX = 
"driver-factory.class";
+
+       /** Defines the configuration key of that external resource in Yarn. 
This is used as a suffix in an actual config. */
+       public static final String EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX = 
"yarn.config-key";
+
+       /** Defines the configuration key of that external resource in 
Kubernetes. This is used as a suffix in an actual config. */
+       public static final String 
EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX = "kubernetes.config-key";
+
+       /**
+        * The prefix for all external resources configs. Has to be combined 
with a resource name and
+        * the configs mentioned below.
+        */
+       private static final String EXTERNAL_RESOURCE_PREFIX = 
"external-resource";
+
+       /** User-defined params of that external resource. This is used as a 
suffix in an actual config. */
+       private static final String EXTERNAL_RESOURCE_PARAMETER_SUFFIX = 
"param.<params>";
+
+       /**
+        * List of the resource_name of all external resources with delimiter 
";". The resource_name will be used to
+        * splice related config options for external resource. Only the 
resource_name defined here will go into effect in external
+        * resource framework.
+        */
+       public static final ConfigOption<List<String>> EXTERNAL_RESOURCE_LIST =
+               key("external-resources")
+                       .stringType()
+                       .asList()
+                       .noDefaultValue()
+                       .withDescription("List of the <resource_name> of all 
external resources with delimiter \";\". " +
+                               "The <resource_name> will be used to splice 
related config options for external resource. Only the " +
+                               "<resource_name> defined here will go into 
effect by external resource framework.");
+
+       /**
+        * Defines the factory class name for the external resource identified 
by &gt;resource_name&lt;. The factory will be used
+        * to instantiated the {@link 
org.apache.flink.api.common.externalresource.ExternalResourceDriver} at the 
TaskExecutor side.
+        *
+        * <p>It is intentionally included into user docs while unused.
+        */
+       @SuppressWarnings("unused")
+       public static final ConfigOption<String> 
EXTERNAL_RESOURCE_DRIVER_FACTORY_CLASS =
+               
key(genericKeyWithSuffix(EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX))
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("Defines the factory class name for 
the external resource identified by <resource_name>. The " +
+                               "factory will be used to instantiated the 
ExternalResourceDriver at the TaskExecutor side.");
+       /**
+        * The amount for the external resource specified by 
&gt;resource_name&lt; per TaskExecutor.
+        *
+        * <p>It is intentionally included into user docs while unused.
+        */
+       @SuppressWarnings("unused")
+       public static final ConfigOption<Long> EXTERNAL_RESOURCE_AMOUNT =
+               key(genericKeyWithSuffix(EXTERNAL_RESOURCE_AMOUNT_SUFFIX))
+                       .longType()
+                       .defaultValue(0L)

Review comment:
       Why would user configure the resource driver while have the amount `0`?
   Maybe `noDefaultValue()`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Utility class for external resource framework.
+ */
+public class ExternalResourceUtils {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ExternalResourceUtils.class);
+
+       private ExternalResourceUtils() {
+               throw new UnsupportedOperationException("This class should 
never be instantiated.");
+       }
+
+       /**
+        * Get the enabled external resource list from configuration.
+        */
+       @VisibleForTesting
+       static Set<String> getExternalResourceSet(Configuration config) {
+               return new HashSet<>(ConfigUtils.decodeListFromConfig(config, 
ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, x -> x));
+       }
+
+       /**
+        * Get the external resources map for Kubernetes.
+        */
+       public static Map<String, Long> 
getExternalResourcesForKubernetes(Configuration config) {
+               return getExternalResources(config, 
ExternalResourceOptions.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX);
+       }
+
+       /**
+        * Get the external resources map for Yarn.
+        */
+       public static Map<String, Long> 
getExternalResourcesForYarn(Configuration config) {
+               return getExternalResources(config, 
ExternalResourceOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX);
+       }
+
+       /**
+        * Get the external resources map. The key should be used for 
deployment specific container request,
+        * and values should be the amount of that resource.
+        */
+       @VisibleForTesting
+       static Map<String, Long> getExternalResources(Configuration config, 
String suffix) {
+               final Set<String> resourceSet = getExternalResourceSet(config);
+               LOG.info("Enabled external resources: {}", resourceSet);
+
+               if (resourceSet.isEmpty()) {
+                       return Collections.emptyMap();
+               }
+
+               final Map<String, Long> externalResourceConfigs = new 
HashMap<>();
+               for (String resourceName: resourceSet) {
+                       final ConfigOption<Long> amountOption =
+                               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, 
ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX))
+                                       .longType()
+                                       .defaultValue(0L);
+                       final ConfigOption<String> configKeyOption =
+                               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, suffix))
+                                       .stringType()
+                                       .noDefaultValue();
+                       final String configKey = 
config.getString(configKeyOption);
+                       final Long amount = config.getLong(amountOption);
+
+                       if (StringUtils.isNullOrWhitespaceOnly(configKey)) {
+                               LOG.warn("Could not find valid {} for {}", 
configKeyOption.key(), resourceName);
+                               break;
+                       }
+                       if (amount <= 0) {
+                               LOG.warn("The amount of the {} should be 
configured with a positive value.", resourceName);

Review comment:
       Let's separate the two cases that amount is not configured and amount is 
configured with illegal value.

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.List;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration options for external resources and external resource drivers.
+ */
+@PublicEvolving
+public class ExternalResourceOptions {
+
+       /** The amount of the external resource per task executor. This is used 
as a suffix in an actual config. */
+       public static final String EXTERNAL_RESOURCE_AMOUNT_SUFFIX = "amount";
+
+       /** The driver factory class of the external resource to use. This is 
used as a suffix in an actual config. */
+       public static final String EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX = 
"driver-factory.class";
+
+       /** Defines the configuration key of that external resource in Yarn. 
This is used as a suffix in an actual config. */
+       public static final String EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX = 
"yarn.config-key";
+
+       /** Defines the configuration key of that external resource in 
Kubernetes. This is used as a suffix in an actual config. */
+       public static final String 
EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX = "kubernetes.config-key";
+
+       /**
+        * The prefix for all external resources configs. Has to be combined 
with a resource name and
+        * the configs mentioned below.
+        */
+       private static final String EXTERNAL_RESOURCE_PREFIX = 
"external-resource";
+
+       /** User-defined params of that external resource. This is used as a 
suffix in an actual config. */
+       private static final String EXTERNAL_RESOURCE_PARAMETER_SUFFIX = 
"param.<params>";
+
+       /**
+        * List of the resource_name of all external resources with delimiter 
";". The resource_name will be used to
+        * splice related config options for external resource. Only the 
resource_name defined here will go into effect in external
+        * resource framework.
+        */
+       public static final ConfigOption<List<String>> EXTERNAL_RESOURCE_LIST =
+               key("external-resources")
+                       .stringType()
+                       .asList()
+                       .noDefaultValue()
+                       .withDescription("List of the <resource_name> of all 
external resources with delimiter \";\". " +
+                               "The <resource_name> will be used to splice 
related config options for external resource. Only the " +
+                               "<resource_name> defined here will go into 
effect by external resource framework.");
+
+       /**
+        * Defines the factory class name for the external resource identified 
by &gt;resource_name&lt;. The factory will be used
+        * to instantiated the {@link 
org.apache.flink.api.common.externalresource.ExternalResourceDriver} at the 
TaskExecutor side.
+        *
+        * <p>It is intentionally included into user docs while unused.
+        */
+       @SuppressWarnings("unused")
+       public static final ConfigOption<String> 
EXTERNAL_RESOURCE_DRIVER_FACTORY_CLASS =
+               
key(genericKeyWithSuffix(EXTERNAL_RESOURCE_DRIVER_FACTORY_SUFFIX))
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("Defines the factory class name for 
the external resource identified by <resource_name>. The " +
+                               "factory will be used to instantiated the 
ExternalResourceDriver at the TaskExecutor side.");
+       /**
+        * The amount for the external resource specified by 
&gt;resource_name&lt; per TaskExecutor.
+        *
+        * <p>It is intentionally included into user docs while unused.
+        */
+       @SuppressWarnings("unused")
+       public static final ConfigOption<Long> EXTERNAL_RESOURCE_AMOUNT =
+               key(genericKeyWithSuffix(EXTERNAL_RESOURCE_AMOUNT_SUFFIX))
+                       .longType()
+                       .defaultValue(0L)
+                       .withDescription("The amount for the external resource 
specified by <resource_name> per TaskExecutor.");
+
+       /**
+        * If configured, Flink will add this key to the resource profile of 
container request to Yarn. The value will be
+        * set to {@link ExternalResourceOptions#EXTERNAL_RESOURCE_AMOUNT}.
+        *
+        * <p>It is intentionally included into user docs while unused.
+        */
+       @SuppressWarnings("unused")
+       public static final ConfigOption<String> 
EXTERNAL_RESOURCE_YARN_CONFIG_KEY =
+               
key(genericKeyWithSuffix(EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX))
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("If configured, Flink will add this 
key to the resource profile of container request to Yarn. " +
+                               "The value will be set to " + 
EXTERNAL_RESOURCE_AMOUNT.key() + ".");
+
+       /**
+        * If configured, Flink will add "resources.limits.&gt;config-key&lt;" 
and "resources.requests.&gt;config-key&lt;" to the main
+        * container of TaskExecutor and set the value to {@link 
ExternalResourceOptions#EXTERNAL_RESOURCE_AMOUNT}.
+        *
+        * <p>It is intentionally included into user docs while unused.
+        */
+       @SuppressWarnings("unused")
+       public static final ConfigOption<String> 
EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY =
+               
key(genericKeyWithSuffix(EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX))
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("If configured, Flink will add 
\"resources.limits.<config-key>\" and \"resources.requests.<config-key>\" " +
+                               "to the main container of TaskExecutor and set 
the value to " + EXTERNAL_RESOURCE_AMOUNT.key() + ".");

Review comment:
       Same here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Utility class for external resource framework.
+ */
+public class ExternalResourceUtils {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ExternalResourceUtils.class);
+
+       private ExternalResourceUtils() {
+               throw new UnsupportedOperationException("This class should 
never be instantiated.");
+       }
+
+       /**
+        * Get the enabled external resource list from configuration.
+        */
+       @VisibleForTesting
+       static Set<String> getExternalResourceSet(Configuration config) {
+               return new HashSet<>(ConfigUtils.decodeListFromConfig(config, 
ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, x -> x));
+       }
+
+       /**
+        * Get the external resources map for Kubernetes.
+        */
+       public static Map<String, Long> 
getExternalResourcesForKubernetes(Configuration config) {
+               return getExternalResources(config, 
ExternalResourceOptions.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX);
+       }
+
+       /**
+        * Get the external resources map for Yarn.
+        */
+       public static Map<String, Long> 
getExternalResourcesForYarn(Configuration config) {
+               return getExternalResources(config, 
ExternalResourceOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX);
+       }
+
+       /**
+        * Get the external resources map. The key should be used for 
deployment specific container request,
+        * and values should be the amount of that resource.
+        */

Review comment:
       > The key should be used for deployment specific container request, and 
values should be the amount of that resource.
   
   I think this explanation should be added to 
`getExternalResourcesForKubernetes` and `getExternalResourcesForYarn`, because 
those are the pubic interface this class exposes.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link ExternalResourceUtils} class.
+ */
+public class ExternalResourceUtilsTest {
+
+       private final String resourceName1 = "foo";
+       private final String resourceName2 = "bar";
+       private final String resourceList = resourceName1 + ";" + resourceName2;
+       private final long resourceAmount1 = 2L;
+       private final long resourceAmount2 = 1L;
+       private final String resourceConfigKey1 = "flink1";
+       private final String resourceConfigKey2 = "flink2";
+       final String suffix = "flink.config-key";

Review comment:
       Could be `static`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Utility class for external resource framework.
+ */
+public class ExternalResourceUtils {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ExternalResourceUtils.class);
+
+       private ExternalResourceUtils() {
+               throw new UnsupportedOperationException("This class should 
never be instantiated.");
+       }
+
+       /**
+        * Get the enabled external resource list from configuration.
+        */
+       @VisibleForTesting
+       static Set<String> getExternalResourceSet(Configuration config) {
+               return new HashSet<>(ConfigUtils.decodeListFromConfig(config, 
ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, x -> x));
+       }
+
+       /**
+        * Get the external resources map for Kubernetes.
+        */
+       public static Map<String, Long> 
getExternalResourcesForKubernetes(Configuration config) {
+               return getExternalResources(config, 
ExternalResourceOptions.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX);
+       }
+
+       /**
+        * Get the external resources map for Yarn.
+        */
+       public static Map<String, Long> 
getExternalResourcesForYarn(Configuration config) {
+               return getExternalResources(config, 
ExternalResourceOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX);
+       }
+
+       /**
+        * Get the external resources map. The key should be used for 
deployment specific container request,
+        * and values should be the amount of that resource.
+        */
+       @VisibleForTesting
+       static Map<String, Long> getExternalResources(Configuration config, 
String suffix) {
+               final Set<String> resourceSet = getExternalResourceSet(config);
+               LOG.info("Enabled external resources: {}", resourceSet);
+
+               if (resourceSet.isEmpty()) {
+                       return Collections.emptyMap();
+               }
+
+               final Map<String, Long> externalResourceConfigs = new 
HashMap<>();
+               for (String resourceName: resourceSet) {
+                       final ConfigOption<Long> amountOption =
+                               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, 
ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX))
+                                       .longType()
+                                       .defaultValue(0L);
+                       final ConfigOption<String> configKeyOption =
+                               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, suffix))
+                                       .stringType()
+                                       .noDefaultValue();
+                       final String configKey = 
config.getString(configKeyOption);
+                       final Long amount = config.getLong(amountOption);
+
+                       if (StringUtils.isNullOrWhitespaceOnly(configKey)) {
+                               LOG.warn("Could not find valid {} for {}", 
configKeyOption.key(), resourceName);

Review comment:
       Better to mention in the log that the resource is ignored.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Utility class for external resource framework.
+ */
+public class ExternalResourceUtils {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ExternalResourceUtils.class);
+
+       private ExternalResourceUtils() {
+               throw new UnsupportedOperationException("This class should 
never be instantiated.");
+       }
+
+       /**
+        * Get the enabled external resource list from configuration.
+        */
+       @VisibleForTesting
+       static Set<String> getExternalResourceSet(Configuration config) {
+               return new HashSet<>(ConfigUtils.decodeListFromConfig(config, 
ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, x -> x));
+       }
+
+       /**
+        * Get the external resources map for Kubernetes.
+        */
+       public static Map<String, Long> 
getExternalResourcesForKubernetes(Configuration config) {
+               return getExternalResources(config, 
ExternalResourceOptions.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX);
+       }
+
+       /**
+        * Get the external resources map for Yarn.
+        */
+       public static Map<String, Long> 
getExternalResourcesForYarn(Configuration config) {
+               return getExternalResources(config, 
ExternalResourceOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX);
+       }
+
+       /**
+        * Get the external resources map. The key should be used for 
deployment specific container request,
+        * and values should be the amount of that resource.
+        */
+       @VisibleForTesting
+       static Map<String, Long> getExternalResources(Configuration config, 
String suffix) {
+               final Set<String> resourceSet = getExternalResourceSet(config);
+               LOG.info("Enabled external resources: {}", resourceSet);
+
+               if (resourceSet.isEmpty()) {
+                       return Collections.emptyMap();
+               }
+
+               final Map<String, Long> externalResourceConfigs = new 
HashMap<>();
+               for (String resourceName: resourceSet) {
+                       final ConfigOption<Long> amountOption =
+                               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, 
ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX))
+                                       .longType()
+                                       .defaultValue(0L);
+                       final ConfigOption<String> configKeyOption =
+                               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, suffix))
+                                       .stringType()
+                                       .noDefaultValue();
+                       final String configKey = 
config.getString(configKeyOption);
+                       final Long amount = config.getLong(amountOption);

Review comment:
       ```suggestion
                        final long amount = config.getLong(amountOption);
   ```

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/ResourceInformationReflector.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Looks up the methods related to 
org.apache.hadoop.yarn.api.records.Resource#setResourceInformation.
+ * Only supported in Hadoop 3.1+ or 2.10+.
+ */
+public class ResourceInformationReflector {

Review comment:
       ```suggestion
   class ResourceInformationReflector {
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Utility class for external resource framework.
+ */
+public class ExternalResourceUtils {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ExternalResourceUtils.class);
+
+       private ExternalResourceUtils() {
+               throw new UnsupportedOperationException("This class should 
never be instantiated.");
+       }
+
+       /**
+        * Get the enabled external resource list from configuration.
+        */
+       @VisibleForTesting
+       static Set<String> getExternalResourceSet(Configuration config) {
+               return new HashSet<>(ConfigUtils.decodeListFromConfig(config, 
ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, x -> x));
+       }
+
+       /**
+        * Get the external resources map for Kubernetes.
+        */
+       public static Map<String, Long> 
getExternalResourcesForKubernetes(Configuration config) {
+               return getExternalResources(config, 
ExternalResourceOptions.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX);
+       }
+
+       /**
+        * Get the external resources map for Yarn.
+        */
+       public static Map<String, Long> 
getExternalResourcesForYarn(Configuration config) {
+               return getExternalResources(config, 
ExternalResourceOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX);
+       }
+
+       /**
+        * Get the external resources map. The key should be used for 
deployment specific container request,
+        * and values should be the amount of that resource.
+        */
+       @VisibleForTesting
+       static Map<String, Long> getExternalResources(Configuration config, 
String suffix) {
+               final Set<String> resourceSet = getExternalResourceSet(config);
+               LOG.info("Enabled external resources: {}", resourceSet);
+
+               if (resourceSet.isEmpty()) {
+                       return Collections.emptyMap();
+               }
+
+               final Map<String, Long> externalResourceConfigs = new 
HashMap<>();
+               for (String resourceName: resourceSet) {
+                       final ConfigOption<Long> amountOption =
+                               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, 
ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX))
+                                       .longType()
+                                       .defaultValue(0L);
+                       final ConfigOption<String> configKeyOption =
+                               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, suffix))
+                                       .stringType()
+                                       .noDefaultValue();
+                       final String configKey = 
config.getString(configKeyOption);
+                       final Long amount = config.getLong(amountOption);
+
+                       if (StringUtils.isNullOrWhitespaceOnly(configKey)) {
+                               LOG.warn("Could not find valid {} for {}", 
configKeyOption.key(), resourceName);
+                               break;
+                       }
+                       if (amount <= 0) {
+                               LOG.warn("The amount of the {} should be 
configured with a positive value.", resourceName);

Review comment:
       Same here. Better to mention in the log that the resource is ignored.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link ExternalResourceUtils} class.
+ */
+public class ExternalResourceUtilsTest {
+
+       private final String resourceName1 = "foo";
+       private final String resourceName2 = "bar";
+       private final String resourceList = resourceName1 + ";" + resourceName2;
+       private final long resourceAmount1 = 2L;
+       private final long resourceAmount2 = 1L;
+       private final String resourceConfigKey1 = "flink1";
+       private final String resourceConfigKey2 = "flink2";
+       final String suffix = "flink.config-key";

Review comment:
       `suffix` could be `private`

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link ExternalResourceUtils} class.
+ */
+public class ExternalResourceUtilsTest {
+
+       private final String resourceName1 = "foo";
+       private final String resourceName2 = "bar";
+       private final String resourceList = resourceName1 + ";" + resourceName2;
+       private final long resourceAmount1 = 2L;
+       private final long resourceAmount2 = 1L;
+       private final String resourceConfigKey1 = "flink1";
+       private final String resourceConfigKey2 = "flink2";
+       final String suffix = "flink.config-key";
+
+       @Test
+       public void testGetExternalResourceList() {

Review comment:
       If we remove this class, we can also remove `@VisibleForTesting` for 
`ExternalResourceUtils#getExternalResourceSet`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link ExternalResourceUtils} class.
+ */
+public class ExternalResourceUtilsTest {
+
+       private final String resourceName1 = "foo";
+       private final String resourceName2 = "bar";
+       private final String resourceList = resourceName1 + ";" + resourceName2;
+       private final long resourceAmount1 = 2L;
+       private final long resourceAmount2 = 1L;
+       private final String resourceConfigKey1 = "flink1";
+       private final String resourceConfigKey2 = "flink2";
+       final String suffix = "flink.config-key";
+
+       @Test
+       public void testGetExternalResourceList() {
+               final Configuration config = new Configuration();
+               
config.setString(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key(), 
resourceList);
+
+               final Set<String> resourceSet = 
ExternalResourceUtils.getExternalResourceSet(config);
+
+               assertThat(resourceSet.size(), is(2));
+               assertTrue(resourceSet.contains(resourceName1));
+               assertTrue(resourceSet.contains(resourceName2));
+       }
+
+       @Test
+       public void testGetExternalResourcesForKubernetes() {

Review comment:
       I would suggest to also set the yarn config key for another resource 
name, and verify it does not appear in the result config map for k8s.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link ExternalResourceUtils} class.
+ */
+public class ExternalResourceUtilsTest {
+
+       private final String resourceName1 = "foo";
+       private final String resourceName2 = "bar";
+       private final String resourceList = resourceName1 + ";" + resourceName2;
+       private final long resourceAmount1 = 2L;
+       private final long resourceAmount2 = 1L;
+       private final String resourceConfigKey1 = "flink1";
+       private final String resourceConfigKey2 = "flink2";
+       final String suffix = "flink.config-key";
+
+       @Test
+       public void testGetExternalResourceList() {
+               final Configuration config = new Configuration();
+               
config.setString(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key(), 
resourceList);
+
+               final Set<String> resourceSet = 
ExternalResourceUtils.getExternalResourceSet(config);
+
+               assertThat(resourceSet.size(), is(2));
+               assertTrue(resourceSet.contains(resourceName1));
+               assertTrue(resourceSet.contains(resourceName2));
+       }
+
+       @Test
+       public void testGetExternalResourcesForKubernetes() {
+               final Configuration config = new Configuration();
+
+               
config.setString(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key(), 
resourceName1);
+               
config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName1,
 ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), resourceAmount1);
+               
config.setString(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName1,
 ExternalResourceOptions.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX), 
resourceConfigKey1);
+
+               final Map<String, Long> configMap = 
ExternalResourceUtils.getExternalResourcesForKubernetes(config);
+
+               assertThat(configMap.size(), is(1));
+               assertTrue(configMap.containsKey(resourceConfigKey1));
+               assertThat(configMap.get(resourceConfigKey1), 
is(resourceAmount1));
+       }
+
+       @Test
+       public void testGetExternalResourcesForYarn() {

Review comment:
       Same here.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/ResourceInformationReflector.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Looks up the methods related to 
org.apache.hadoop.yarn.api.records.Resource#setResourceInformation.
+ * Only supported in Hadoop 3.1+ or 2.10+.
+ */
+public class ResourceInformationReflector {
+
+       static final ResourceInformationReflector INSTANCE = new 
ResourceInformationReflector();
+
+       /** Class used to set the extended resource. */
+       private static final String RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation";
+
+       private Class<?> resourceInfoClass;
+       private Method setResourceInfoMethod;
+       private Method getResourceValueMethod;
+       private Method getInfoNameMethod;
+       private Method getInfoValueMethod;
+       private Method resourceInfoNewInstance;
+       private boolean isYarnResourceTypesAvailable;

Review comment:
       We can keep these fields `final`.
   The trick is to declare some local variables in the constructor, before the 
`try` block, and initialize these fields with the local variables after the 
`catch` block.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
##########
@@ -118,7 +120,8 @@ private InternalContainerResource 
createAndMapContainerResource(final WorkerReso
                        
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
workerResourceSpec);
                final InternalContainerResource internalContainerResource = new 
InternalContainerResource(
                        
normalize(taskExecutorProcessSpec.getTotalProcessMemorySize().getMebiBytes(), 
minMemMB),
-                       
normalize(taskExecutorProcessSpec.getCpuCores().getValue().intValue(), 
minVcore));
+                       
normalize(taskExecutorProcessSpec.getCpuCores().getValue().intValue(), 
minVcore),
+                       
ExternalResourceUtils.getExternalResourcesForYarn(flinkConfig));

Review comment:
       There's no need to compute the external resources every time we create a 
`Resource` for a new `WorkerResourceSpec`. We can compute it only once 
initially.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/ResourceInformationReflector.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Looks up the methods related to 
org.apache.hadoop.yarn.api.records.Resource#setResourceInformation.
+ * Only supported in Hadoop 3.1+ or 2.10+.
+ */
+public class ResourceInformationReflector {
+
+       static final ResourceInformationReflector INSTANCE = new 
ResourceInformationReflector();
+
+       /** Class used to set the extended resource. */
+       private static final String RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation";
+
+       private Class<?> resourceInfoClass;
+       private Method setResourceInfoMethod;
+       private Method getResourceValueMethod;
+       private Method getInfoNameMethod;
+       private Method getInfoValueMethod;
+       private Method resourceInfoNewInstance;
+       private boolean isYarnResourceTypesAvailable;
+
+       private ResourceInformationReflector() {
+               try {
+                       resourceInfoClass = Class.forName(RESOURCE_INFO_CLASS);
+                       setResourceInfoMethod = 
Resource.class.getMethod("setResourceInformation", String.class, 
resourceInfoClass);
+                       getResourceValueMethod = 
Resource.class.getMethod("getResources");
+                       getInfoNameMethod = 
resourceInfoClass.getMethod("getName");
+                       getInfoValueMethod = 
resourceInfoClass.getMethod("getValue");
+                       resourceInfoNewInstance = 
resourceInfoClass.getMethod("newInstance", String.class, long.class);

Review comment:
       It would be better to strictly align these variable names with the 
method names. Since there are methods of both `Resource` and `ResourceInfo`, I 
would suggest to also include the class name.
   E.g.,
   - `resourceSetResourceInformationMethod`
   - `resourceInformationGetNameMethod`

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link ExternalResourceUtils} class.
+ */
+public class ExternalResourceUtilsTest {
+
+       private final String resourceName1 = "foo";
+       private final String resourceName2 = "bar";
+       private final String resourceList = resourceName1 + ";" + resourceName2;
+       private final long resourceAmount1 = 2L;
+       private final long resourceAmount2 = 1L;
+       private final String resourceConfigKey1 = "flink1";
+       private final String resourceConfigKey2 = "flink2";
+       final String suffix = "flink.config-key";
+
+       @Test
+       public void testGetExternalResourceList() {

Review comment:
       Isn't this already covered by `testGetExternalResourcesForKubernetes` 
and `testGetExternalResourcesForYarn`?

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/ResourceInformationReflector.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Looks up the methods related to 
org.apache.hadoop.yarn.api.records.Resource#setResourceInformation.
+ * Only supported in Hadoop 3.1+ or 2.10+.
+ */
+public class ResourceInformationReflector {
+
+       static final ResourceInformationReflector INSTANCE = new 
ResourceInformationReflector();
+
+       /** Class used to set the extended resource. */
+       private static final String RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation";
+
+       private Class<?> resourceInfoClass;

Review comment:
       Could be a local variable in the constructor.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/ResourceInformationReflector.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Looks up the methods related to 
org.apache.hadoop.yarn.api.records.Resource#setResourceInformation.
+ * Only supported in Hadoop 3.1+ or 2.10+.
+ */
+public class ResourceInformationReflector {
+
+       static final ResourceInformationReflector INSTANCE = new 
ResourceInformationReflector();
+
+       /** Class used to set the extended resource. */
+       private static final String RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation";
+
+       private Class<?> resourceInfoClass;
+       private Method setResourceInfoMethod;
+       private Method getResourceValueMethod;
+       private Method getInfoNameMethod;
+       private Method getInfoValueMethod;
+       private Method resourceInfoNewInstance;
+       private boolean isYarnResourceTypesAvailable;
+
+       private ResourceInformationReflector() {
+               try {
+                       resourceInfoClass = Class.forName(RESOURCE_INFO_CLASS);
+                       setResourceInfoMethod = 
Resource.class.getMethod("setResourceInformation", String.class, 
resourceInfoClass);
+                       getResourceValueMethod = 
Resource.class.getMethod("getResources");
+                       getInfoNameMethod = 
resourceInfoClass.getMethod("getName");
+                       getInfoValueMethod = 
resourceInfoClass.getMethod("getValue");
+                       resourceInfoNewInstance = 
resourceInfoClass.getMethod("newInstance", String.class, long.class);
+                       isYarnResourceTypesAvailable = true;
+               } catch (Exception e) {
+                       isYarnResourceTypesAvailable = false;
+               }
+       }
+
+       /**
+        * Add the given resourceName and value to the {@link Resource}.
+        */
+       void setResourceInformation(Resource resource, String resourceName, 
long amount) throws Exception {

Review comment:
       It would be better to handle the exceptions inside this class, so that 
the caller doesn't need to be aware of the version differences.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link ExternalResourceUtils} class.
+ */
+public class ExternalResourceUtilsTest {
+
+       private final String resourceName1 = "foo";
+       private final String resourceName2 = "bar";
+       private final String resourceList = resourceName1 + ";" + resourceName2;
+       private final long resourceAmount1 = 2L;
+       private final long resourceAmount2 = 1L;
+       private final String resourceConfigKey1 = "flink1";
+       private final String resourceConfigKey2 = "flink2";
+       final String suffix = "flink.config-key";
+
+       @Test
+       public void testGetExternalResourceList() {
+               final Configuration config = new Configuration();
+               
config.setString(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key(), 
resourceList);

Review comment:
       Why not make `resourceList` a `List<String>` and simply use 
`config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIS, resourceList)` here?
   Same for the other occurrences in this class.

##########
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java
##########
@@ -233,6 +237,70 @@ public void testMatchResourceWithDifferentImplementation() 
{
                assertThat(adapter.getWorkerSpecs(resourceImpl2, strategy), 
contains(workerSpec));
        }
 
+       @Test
+       public void testSetExtendedResourcesWithYarnSupport() throws Exception {
+               Assume.assumeTrue((HadoopUtils.isMinHadoopVersion(2, 10) && 
HadoopUtils.isMaxHadoopVersion(3, 0)) ||
+                       HadoopUtils.isMinHadoopVersion(3, 1));

Review comment:
       I would try to avoid these sort of version assumptions if possible. 
Please refer to the discussion 
[here](https://github.com/apache/flink/pull/11353#discussion_r410858435).
   Alternatively, we can use testing `Resource` implementations, w/ & w/o the 
concerned methods. See `RegisterApplicationMasterResponseReflectorTest` for an 
example.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
##########
@@ -145,6 +148,37 @@ boolean resourceWithinMaxAllocation(final 
InternalContainerResource resource) {
                return resource.memory <= maxMemMB && resource.vcores <= 
maxVcore;
        }
 
+       @VisibleForTesting
+       static void setExternalResourceRequestIfPossible(Map<String, Long> 
externalResources, Resource resource) {
+               if 
(!ResourceInformationReflector.INSTANCE.isYarnResourceTypesAvailable()) {
+                       LOG.info("Will not request extended resource because 
the underlying YARN does not support it.");
+                       return;
+               }
+
+               try {
+                       for (Map.Entry<String, Long> externalResource: 
externalResources.entrySet()) {
+                               
ResourceInformationReflector.INSTANCE.setResourceInformation(resource, 
externalResource.getKey(), externalResource.getValue());
+                               LOG.info("Successfully request the external 
resource {} with amount {}.", externalResource.getKey(), 
externalResource.getValue());
+                       }
+               } catch (Exception e) {
+                       LOG.error("Error in setting the external resource.", e);
+               }
+       }
+
+       @VisibleForTesting
+       static Map<String, Long> getExternalResourcesIfPossible(Resource 
resource) {
+               if 
(!ResourceInformationReflector.INSTANCE.isYarnResourceTypesAvailable()) {
+                       return Collections.emptyMap();
+               }
+
+               try {
+                       return 
ResourceInformationReflector.INSTANCE.getExternalResources(resource);
+               } catch (Exception e) {
+                       LOG.error("Error in getting the external resource.", e);
+                       return Collections.emptyMap();
+               }
+       }
+

Review comment:
       I think the `WorkerSpecContainerResourceAdapter` should not have 
different behaviors WRT whether the external resources are supported by the 
current Hadoop version.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -113,4 +117,51 @@ private ExternalResourceUtils() {
 
                return externalResourceConfigs;
        }
+
+       /**
+        * Instantiate the {@link ExternalResourceDriver}s for all of enabled 
external resources. {@link ExternalResourceDriver}s
+        * are mapped by its resource name.
+        */
+       public static Map<String, ExternalResourceDriver> 
externalResourceDriversFromConfig(Configuration config, PluginManager 
pluginManager) throws Exception {
+               final Set<String> resourceSet = getExternalResourceSet(config);
+               LOG.info("Enabled external resources: {}", resourceSet);
+
+               if (resourceSet.isEmpty()) {
+                       return Collections.emptyMap();
+               }
+
+               final Iterator<ExternalResourceDriverFactory> factoryIterator = 
pluginManager.load(ExternalResourceDriverFactory.class);
+               final Map<String, ExternalResourceDriverFactory> 
externalResourceFactories = new HashMap<>();
+               factoryIterator.forEachRemaining(
+                       externalResourceDriverFactory -> {
+                               
externalResourceFactories.put(externalResourceDriverFactory.getClass().getName(),
 externalResourceDriverFactory);
+                       });

Review comment:
       Should we load all the driver factories?
   What if for each configured resource name and factory, we call plugin 
manager to load that driver class? I think that should avoid loading 
unnecessary plugins.
   Moreover, if there are multiple plugins with the configured class names, we 
should load and use only one of them.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -164,4 +194,23 @@ private ExternalResourceUtils() {
 
                return externalResourceDrivers;
        }
+
+       /**
+        * Get the external resource information from environment. Index by the 
resourceName.
+        */
+       public static Map<String, Set<? extends ExternalResourceInfo>> 
getExternalResourceInfo(Map<String, ExternalResourceDriver> 
externalResourceDrivers, Configuration configuration) {
+               final Map<String, Long> externalResourceAmountMap = 
getExternalResourceAmountMap(configuration);
+               final Map<String, Set<? extends ExternalResourceInfo>> 
externalResources = new HashMap<>();
+               for (Map.Entry<String, ExternalResourceDriver> 
externalResourceDriverEntry : externalResourceDrivers.entrySet()) {
+                       final String resourceName = 
externalResourceDriverEntry.getKey();
+                       final ExternalResourceDriver externalResourceDriver = 
externalResourceDriverEntry.getValue();
+                       if 
(externalResourceAmountMap.containsKey(resourceName)) {
+                               final Set<? extends ExternalResourceInfo> 
externalResourceInfos = 
externalResourceDriver.retrieveResourceInfo(externalResourceAmountMap.get(resourceName));
+                               externalResources.put(resourceName, 
externalResourceInfos);
+                       } else {
+                               LOG.error("Could not found illegal amount 
configuration for {}.", resourceName);

Review comment:
       Illegal amount?

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/ResourceInformationReflector.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Looks up the methods related to 
org.apache.hadoop.yarn.api.records.Resource#setResourceInformation.
+ * Only supported in Hadoop 3.1+ or 2.10+.
+ */
+public class ResourceInformationReflector {
+
+       static final ResourceInformationReflector INSTANCE = new 
ResourceInformationReflector();
+
+       /** Class used to set the extended resource. */
+       private static final String RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation";
+
+       private Class<?> resourceInfoClass;
+       private Method setResourceInfoMethod;
+       private Method getResourceValueMethod;
+       private Method getInfoNameMethod;
+       private Method getInfoValueMethod;
+       private Method resourceInfoNewInstance;
+       private boolean isYarnResourceTypesAvailable;
+
+       private ResourceInformationReflector() {
+               try {
+                       resourceInfoClass = Class.forName(RESOURCE_INFO_CLASS);
+                       setResourceInfoMethod = 
Resource.class.getMethod("setResourceInformation", String.class, 
resourceInfoClass);
+                       getResourceValueMethod = 
Resource.class.getMethod("getResources");
+                       getInfoNameMethod = 
resourceInfoClass.getMethod("getName");
+                       getInfoValueMethod = 
resourceInfoClass.getMethod("getValue");
+                       resourceInfoNewInstance = 
resourceInfoClass.getMethod("newInstance", String.class, long.class);
+                       isYarnResourceTypesAvailable = true;
+               } catch (Exception e) {
+                       isYarnResourceTypesAvailable = false;
+               }
+       }
+
+       /**
+        * Add the given resourceName and value to the {@link Resource}.
+        */
+       void setResourceInformation(Resource resource, String resourceName, 
long amount) throws Exception {
+               if (!isYarnResourceTypesAvailable) {
+                       throw new UnsupportedOperationException("Should not try 
to set ResourceInformation because the underlying Yarn does not support it.");
+               }
+               setResourceInfoMethod.invoke(
+                       resource,
+                       resourceName,
+                       resourceInfoNewInstance.invoke(null, resourceName, 
amount));
+       }
+
+       /**
+        * Get the name and value of external resources from the {@link 
Resource}.
+        */
+       Map<String, Long> getExternalResources(Resource resource) throws 
Exception {

Review comment:
       Same here. Let's try to handle the exceptions inside this class.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
##########
@@ -162,4 +170,65 @@ public void 
testGetExternalResourcesWithMultipleExternalResource() {
                assertThat(configMap.get(resourceConfigKey1), 
is(resourceAmount1));
                assertThat(configMap.get(resourceConfigKey2), 
is(resourceAmount2));
        }
+
+       @Test
+       public void testConstructExternalResourceDriversFromConfig() throws 
Exception {
+               final Configuration config = new Configuration();
+               final String driverFactoryClassName = 
"org.apache.flink.runtime.externalresource.TestingExternalResourceDriverFactory";

Review comment:
       Could use `TestingExternalResourceDriverFactory.class.getName()`. Same 
for the other occurences.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
##########
@@ -87,6 +95,21 @@ public boolean hasBroadcastVariable(String name) {
                        throw new IllegalArgumentException("The broadcast 
variable with name '" + name + "' has not been set.");
                }
        }
+
+       @Override
+       public <T extends ExternalResourceInfo> Set<T> 
getExternalResourceInfos(String resourceName, Class<T> externalResourceType) {
+               if (!externalResources.containsKey(resourceName)) {
+                       return Collections.emptySet();
+               }
+
+               final Set<? extends ExternalResourceInfo> infoSet = 
externalResources.get(resourceName);
+               final boolean typeNotMatch = infoSet.stream()
+                       .anyMatch(externalResourceInfo -> 
!externalResourceInfo.getClass().isInstance(externalResourceInfo));

Review comment:
       ```suggestion
                        .anyMatch(externalResourceInfo -> ! 
externalResourceType.isInstance(externalResourceInfo));
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/TestingExternalResourceDriver.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
+import org.apache.flink.configuration.Configuration;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * No-op {@link ExternalResourceDriver} for testing purpose.
+ */
+public class TestingExternalResourceDriver implements ExternalResourceDriver {
+
+       public TestingExternalResourceDriver(Configuration config) {
+       }

Review comment:
       Seems to me we don't need this constructor?

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/ResourceInformationReflector.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Looks up the methods related to 
org.apache.hadoop.yarn.api.records.Resource#setResourceInformation.
+ * Only supported in Hadoop 3.1+ or 2.10+.
+ */
+public class ResourceInformationReflector {
+
+       static final ResourceInformationReflector INSTANCE = new 
ResourceInformationReflector();
+
+       /** Class used to set the extended resource. */
+       private static final String RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation";
+
+       private Class<?> resourceInfoClass;
+       private Method setResourceInfoMethod;
+       private Method getResourceValueMethod;
+       private Method getInfoNameMethod;
+       private Method getInfoValueMethod;
+       private Method resourceInfoNewInstance;
+       private boolean isYarnResourceTypesAvailable;
+
+       private ResourceInformationReflector() {
+               try {
+                       resourceInfoClass = Class.forName(RESOURCE_INFO_CLASS);
+                       setResourceInfoMethod = 
Resource.class.getMethod("setResourceInformation", String.class, 
resourceInfoClass);
+                       getResourceValueMethod = 
Resource.class.getMethod("getResources");
+                       getInfoNameMethod = 
resourceInfoClass.getMethod("getName");
+                       getInfoValueMethod = 
resourceInfoClass.getMethod("getValue");
+                       resourceInfoNewInstance = 
resourceInfoClass.getMethod("newInstance", String.class, long.class);
+                       isYarnResourceTypesAvailable = true;
+               } catch (Exception e) {
+                       isYarnResourceTypesAvailable = false;
+               }
+       }
+
+       /**
+        * Add the given resourceName and value to the {@link Resource}.
+        */
+       void setResourceInformation(Resource resource, String resourceName, 
long amount) throws Exception {
+               if (!isYarnResourceTypesAvailable) {
+                       throw new UnsupportedOperationException("Should not try 
to set ResourceInformation because the underlying Yarn does not support it.");
+               }
+               setResourceInfoMethod.invoke(
+                       resource,
+                       resourceName,
+                       resourceInfoNewInstance.invoke(null, resourceName, 
amount));
+       }
+
+       /**
+        * Get the name and value of external resources from the {@link 
Resource}.
+        */
+       Map<String, Long> getExternalResources(Resource resource) throws 
Exception {
+               if (!isYarnResourceTypesAvailable) {
+                       throw new UnsupportedOperationException("Should not try 
to get ResourceInformation because the underlying Yarn does not support it.");
+               }
+               final Map<String, Long> externalResources = new HashMap<>();
+               final Object[] externalResourcesInfo = (Object[]) 
getResourceValueMethod.invoke(resource);
+               // The first two element would be cpu and mem.
+               for (int i = 2; i < externalResourcesInfo.length; i++) {
+                       final String name = (String) 
getInfoNameMethod.invoke(externalResourcesInfo[i]);
+                       final long value = (long) 
getInfoValueMethod.invoke(externalResourcesInfo[i]);
+                       externalResources.put(name, value);
+               }
+               return externalResources;
+       }
+
+       /**
+        * Check whether the underlying YARN support to set extended resources. 
Only supported in Hadoop 3.1+ or 2.10+.
+        *
+        * @return Whether we could set extended resources.
+        */
+       boolean isYarnResourceTypesAvailable() {

Review comment:
       I think one of the purpose of this reflector is to make the version 
differences transparent to the outside. That means we probably should not 
expose whether the resource types are available.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
##########
@@ -118,7 +120,8 @@ private InternalContainerResource 
createAndMapContainerResource(final WorkerReso
                        
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
workerResourceSpec);
                final InternalContainerResource internalContainerResource = new 
InternalContainerResource(
                        
normalize(taskExecutorProcessSpec.getTotalProcessMemorySize().getMebiBytes(), 
minMemMB),
-                       
normalize(taskExecutorProcessSpec.getCpuCores().getValue().intValue(), 
minVcore));
+                       
normalize(taskExecutorProcessSpec.getCpuCores().getValue().intValue(), 
minVcore),
+                       
ExternalResourceUtils.getExternalResourcesForYarn(flinkConfig));

Review comment:
       Actually, this might even not belong to this class. Deriving the 
external resources from the configuration should have nothing to do with the 
responsibility of this class, because it does not depend on the 
`WorkerResourceSpec`. We can derived the external resources once in 
`YarnResourceManager`, and pass it to this class as an argument.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecoratorTest.java
##########
@@ -56,12 +56,17 @@
 
        private FlinkConfMountDecorator flinkConfMountDecorator;
 
+       @Override
+       protected void setupFlinkConfig() {
+               super.setupFlinkConfig();
+
+               this.flinkConfig.set(KubernetesConfigOptions.FLINK_CONF_DIR, 
FLINK_CONF_DIR_IN_POD);
+       }
+

Review comment:
       These changes are quite unrelated.
   I would suggest to have a separate hotfix commit for this refactoring 
changes, and its subclasses.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
##########
@@ -140,6 +149,21 @@ public String getOperatorUniqueID() {
                return operatorUniqueID;
        }
 
+       @Override
+       public <T extends ExternalResourceInfo> Set<T> 
getExternalResourceInfos(String resourceName, Class<T> externalResourceType) 
throws Exception {
+               if (!externalResources.containsKey(resourceName)) {
+                       return Collections.emptySet();
+               }
+
+               final Set<? extends ExternalResourceInfo> infoSet = 
externalResources.get(resourceName);
+               final boolean typeNotMatch = infoSet.stream()
+                       .anyMatch(externalResourceInfo -> 
!externalResourceInfo.getClass().isInstance(externalResourceInfo));

Review comment:
       ```suggestion
                        .anyMatch(externalResourceInfo -> 
!externalResourceType.isInstance(externalResourceInfo));
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -118,6 +119,35 @@ private ExternalResourceUtils() {
                return externalResourceConfigs;
        }
 
+       /**
+        * Get the map of resource name and amount of all of enabled external 
resources.
+        */
+       private static Map<String, Long> 
getExternalResourceAmountMap(Configuration config) {
+               final Set<String> resourceSet = getExternalResourceSet(config);
+               LOG.info("Enabled external resources: {}", resourceSet);
+
+               if (resourceSet.isEmpty()) {
+                       return Collections.emptyMap();
+               }
+
+               final Map<String, Long> externalResourceAmountMap = new 
HashMap<>();
+               for (String resourceName: resourceSet) {
+                       final ConfigOption<Long> amountOption =
+                               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(resourceName, 
ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX))
+                                       .longType()
+                                       .noDefaultValue();
+                       final Long amount = config.getLong(amountOption, 0);
+                       if (amount <= 0) {
+                               LOG.warn("The amount of the {} should be 
configured with a positive value.", resourceName);
+                               break;

Review comment:
       Same here. Let's separate the two cases of not having amount and having 
illegal amount. Also, log that the amount is ignored.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
##########
@@ -180,21 +219,37 @@ public boolean equals(Object obj) {
                                return true;
                        } else if (obj instanceof InternalContainerResource) {
                                final InternalContainerResource other = 
(InternalContainerResource) obj;
-                               return this.memory == other.memory && 
this.vcores == other.vcores;
+                               return this.memory == other.memory && 
this.vcores == other.vcores && 
this.externalResources.equals(other.externalResources);
                        }
                        return false;
                }
 
                @Override
                public int hashCode() {
+                       final int prime = 31;
                        int result = Integer.hashCode(memory);
-                       result = 31 * result + Integer.hashCode(vcores);
+                       result = prime * result + Integer.hashCode(vcores);
+                       result = prime * result + externalResources.hashCode();
                        return result;
                }
 
                @Override
                public String toString() {
-                       return "<memory:" + memory + ", vCores:" + vcores + ">";
+                       StringBuilder sb = new StringBuilder();
+
+                       sb.append("<memory:")
+                               .append(memory)
+                               .append(", vCores:")
+                               .append(vcores);
+
+                       for (Map.Entry<String, Long> externalResource : 
externalResources.entrySet()) {
+                               sb.append(", ")
+                                       
.append(externalResource.getKey()).append(": ")
+                                       .append(externalResource.getValue());
+                       }

Review comment:
       Shall we sort the external resources? I think it would be helpful to 
always have consistent string representation for an object.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to