xintongsong commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r412724015



##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/externalresource/ExternalResourceInfo.java
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.externalresource;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Contains the information of an external resource. Exposed through {@link 
org.apache.flink.api.common.functions.RuntimeContext}.

Review comment:
       ```suggestion
    * Contains the information of an external resource. 
   ```
   I think how the class is exposed should be irrelevant to the class 
definition.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceDriver.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
+
+import java.util.Set;
+
+/**
+ * Driver which takes the responsibility to manage and provide the information 
of external resource.
+ */
+public interface ExternalResourceDriver {

Review comment:
       I think this is part of the public APIs. It should probably be moved to 
`flink-core` and annotated `@PublicEnvolving`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceDriverFactory.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Factory for {@link ExternalResourceDriver}. Instantiate a driver with 
configuration;
+ */
+public interface ExternalResourceDriverFactory {

Review comment:
       Same here.
   I think this is part of the public APIs. It should probably be moved to 
`flink-core` and annotated `@PublicEnvolving`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceDriverFactory.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Factory for {@link ExternalResourceDriver}. Instantiate a driver with 
configuration;

Review comment:
       ```suggestion
    * Factory for {@link ExternalResourceDriver}. Instantiate a driver with 
configuration.
   ```

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration options for external resources and external resource drivers.
+ */
+@PublicEvolving
+public class ExternalResourceOptions {
+
+       public static final ConfigOption<String> EXTERNAL_RESOURCE_LIST =

Review comment:
       It might be better to use `CongfigOption<List<String>>` for this option. 
However, that means also changing the delimiter from `,` to `;`.
   See `YarnConfigOptions#SHIP_DIRECTORIES` for example.

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration options for external resources and external resource drivers.
+ */
+@PublicEvolving
+public class ExternalResourceOptions {
+
+       public static final ConfigOption<String> EXTERNAL_RESOURCE_LIST =
+               key("external-resources")
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("An optional list of used external 
resources with delimiter \",\". If configured, Flink " +
+                               "would check if the relevant configs exist for 
that resource. At the ResourceManager side, Flink will forward " +
+                               "the request to the underlying external 
resource manager. At the TaskExecutor side, Flink will launch the " +
+                               "corresponding ExternalResourceDriver.");
+
+       public static final ConfigOption<String> 
EXTERNAL_RESOURCE_DRIVER_FACTORY_CLASS =
+               key("external-resource.<resourceName>.driver-factory.class")

Review comment:
       I think we'd better use `<abc_xyz>` rather than `<abcXyz>` for the 
variables in config keys.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
##########
@@ -495,6 +502,32 @@ public void onStopContainerError(ContainerId containerId, 
Throwable throwable) {
        //  Utility methods
        // 
------------------------------------------------------------------------
 
+       private void setExternalResourceRequestIfPossible(Map<String, Long> 
externalResources) throws ResourceManagerException {
+               for (Map.Entry<String, Long> externalResource: 
externalResources.entrySet()) {
+                       if (externalResource.getValue() == 0) {
+                               continue;
+                       }
+
+                       if (!isYarnResourceTypesAvailable()) {
+                               throw new ResourceManagerException("Could not 
request extended resource because the underlying YARN does not support it.");
+                       }
+
+                       try {
+                               final Class<?> resourceInfoClass = 
Class.forName(RESOURCE_INFO_CLASS);
+                               final Method setResourceInfoMethod = 
Resource.class.getMethod("setResourceInformation", String.class, 
resourceInfoClass);
+                               final Method resourceInfoNewInstance = 
resourceInfoClass.getMethod("newInstance", String.class, long.class);

Review comment:
       I think these codes should be outside the loop. No need to create the 
`Class` and `Method` for each external resources. Only the invocation of 
methods need to stay inside the loop.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * Utility class for external resource framework.
+ */
+public class ExternalResourceUtils {

Review comment:
       Let's add a private constructor to make this class non-instanceable.

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration options for external resources and external resource drivers.
+ */
+@PublicEvolving
+public class ExternalResourceOptions {
+
+       public static final ConfigOption<String> EXTERNAL_RESOURCE_LIST =
+               key("external-resources")
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("An optional list of used external 
resources with delimiter \",\". If configured, Flink " +
+                               "would check if the relevant configs exist for 
that resource. At the ResourceManager side, Flink will forward " +
+                               "the request to the underlying external 
resource manager. At the TaskExecutor side, Flink will launch the " +
+                               "corresponding ExternalResourceDriver.");
+
+       public static final ConfigOption<String> 
EXTERNAL_RESOURCE_DRIVER_FACTORY_CLASS =
+               key("external-resource.<resourceName>.driver-factory.class")
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("The ExternalResourceDriverFactory 
class for a specific external resource. At the TaskExecutor side, " +
+                               "ExternalResourceDriver will be instantiated by 
this class with configuration. You must copy the relevant jar into " +
+                               "the `/plugins/<resourceName>` folder of your 
Flink distribution.");
+
+       public static final ConfigOption<Long> EXTERNAL_RESOURCE_AMOUNT =
+               key("external-resource.<resourceName>.amount")
+                       .longType()
+                       .defaultValue(0L)
+                       .withDescription("The amount for a specific external 
resource per TaskExecutor.");
+
+       public static final ConfigOption<String> 
EXTERNAL_RESOURCE_YARN_CONFIG_KEY =
+               key("external-resource.<resourceName>.yarn.config-key")
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("Optional config which defines the 
configuration key of that external resource in Yarn. " +
+                               "If you want the Flink to request the external 
resource from Yarn, you need to explicitly set this key. " +
+                               "Only valid for Yarn mode.");
+
+       public static final ConfigOption<String> 
EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY =
+               key("external-resource.<resourceName>.kubernetes.config-key")
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("Optional config which defines the 
configuration key of that external resource in Kubernetes. " +
+                               "If you want the Flink to request the external 
resource from Kubernetes, you need to explicitly set this key. " +
+                               "Only valid for Kubernetes mode.");
+
+       public static final ConfigOption<String> REPORTER_CONFIG_PARAMETER =
+               key("external-resource.{resourceName}.param.{params}")

Review comment:
       ```suggestion
                key("external-resource.<resourceName>.param.<paramName>")
   ```

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration options for external resources and external resource drivers.
+ */
+@PublicEvolving
+public class ExternalResourceOptions {

Review comment:
       Generally, I think the descriptions of the configuration options should 
be further polished. The description should focus more on the semantics of the 
specific option and how should it be used. Terms and concepts mentioned should 
be explained in detail, e.g., "configuration key in Yarn/Kubernetes", 
"parameter". No need to provide too many backgrounds about how the external 
resource framework works

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * Utility class for external resource framework.
+ */
+public class ExternalResourceUtils {
+       private static final Logger LOG = 
LoggerFactory.getLogger(ExternalResourceUtils.class);
+
+       // regex pattern to split the defined external resources
+       private static final Pattern resourceListPattern = 
Pattern.compile("\\s*,\\s*");
+
+       /**
+        * Get the enabled external resource list from configuration.
+        */
+       @VisibleForTesting
+       static final Set<String> getExternalResourceList(Configuration config) {
+               return 
resourceListPattern.splitAsStream(config.getString(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST,
 ""))
+                       .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+                       .collect(Collectors.toSet());
+       }
+
+       /**
+        * Get the config key and value of all of enabled external resources 
from the underlying external resource manager.
+        * Each external resource manager(Yarn / Kubernetes) have their own 
configuration suffix.
+        */
+       public static Map<String, Long> 
getExternalResourceConfigMap(Configuration config, String suffix) {
+               final Set<String> resourceSet = getExternalResourceList(config);
+               LOG.info("Enabled external resources: {}", resourceSet);
+
+               if (resourceSet.isEmpty()) {
+                       return Collections.emptyMap();
+               }
+
+               final Map<String, Long> externalResourceConfigs = new 
HashMap<>();
+               for (String resourceName: resourceSet) {
+                       final String configKey = 
config.getString(ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX + 
resourceName + suffix, "");
+                       final long amount = 
config.getLong(ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX + 
resourceName + ExternalResourceConstants.EXTERNAL_RESOURCE_AMOUNT_SUFFIX, 0);
+                       if (amount != 0 && 
!StringUtils.isNullOrWhitespaceOnly(configKey)) {

Review comment:
       Is negative amount allowed?
   
   Moreover, we should check not only the sanity of configured values, but also 
the existence of them. It's probably ok for the `configKey` being absent, but 
we should throw at least a warning if the `amount` is not configured.

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration options for external resources and external resource drivers.
+ */
+@PublicEvolving
+public class ExternalResourceOptions {
+
+       public static final ConfigOption<String> EXTERNAL_RESOURCE_LIST =

Review comment:
       JavaDoc is missing.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link ExternalResourceUtils} class.
+ */
+public class ExternalResourceUtilsTest {
+
+       @Test
+       public void testGetExternalResourceList() {
+               final Configuration config = new Configuration();
+               
config.setString(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, "foo,bar");
+
+               final Set<String> resourceSet = 
ExternalResourceUtils.getExternalResourceList(config);
+
+               assertThat(resourceSet.size(), is(2));
+               assertTrue(resourceSet.contains("foo"));
+               assertTrue(resourceSet.contains("bar"));
+       }
+
+       @Test
+       public void testGetExternalResourceConfigMap() {

Review comment:
       It would be better to also add test cases for:
   - configKey is not specified (or empty)
   - illegal amount value
   - multiple resources, w/ or w/o config key conflicts

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
##########
@@ -495,6 +502,32 @@ public void onStopContainerError(ContainerId containerId, 
Throwable throwable) {
        //  Utility methods
        // 
------------------------------------------------------------------------
 
+       private void setExternalResourceRequestIfPossible(Map<String, Long> 
externalResources) throws ResourceManagerException {
+               for (Map.Entry<String, Long> externalResource: 
externalResources.entrySet()) {
+                       if (externalResource.getValue() == 0) {
+                               continue;
+                       }

Review comment:
       Can we keep all this kind of checks and filters in 
`ExternalResourceUtils#getExternalResourceConfigMap`? It would be better to 
have the util method returns only the valid values that can be used directly.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
##########
@@ -495,6 +502,32 @@ public void onStopContainerError(ContainerId containerId, 
Throwable throwable) {
        //  Utility methods
        // 
------------------------------------------------------------------------
 
+       private void setExternalResourceRequestIfPossible(Map<String, Long> 
externalResources) throws ResourceManagerException {
+               for (Map.Entry<String, Long> externalResource: 
externalResources.entrySet()) {
+                       if (externalResource.getValue() == 0) {
+                               continue;
+                       }
+
+                       if (!isYarnResourceTypesAvailable()) {
+                               throw new ResourceManagerException("Could not 
request extended resource because the underlying YARN does not support it.");
+                       }
+
+                       try {
+                               final Class<?> resourceInfoClass = 
Class.forName(RESOURCE_INFO_CLASS);
+                               final Method setResourceInfoMethod = 
Resource.class.getMethod("setResourceInformation", String.class, 
resourceInfoClass);
+                               final Method resourceInfoNewInstance = 
resourceInfoClass.getMethod("newInstance", String.class, long.class);
+                               setResourceInfoMethod.invoke(
+                                       this.resource,
+                                       externalResource.getKey(),
+                                       resourceInfoNewInstance.invoke(null, 
externalResource.getKey(), externalResource.getValue()));
+                               log.info("Successfully request the external 
resource {} with amount {}.", externalResource.getKey(), 
externalResource.getValue());
+
+                       } catch (Exception e) {
+                               throw new ResourceManagerException("Error in 
setting the external resource.", e);
+                       }

Review comment:
       I would try to keep the reflections out of `YarnResourceManager` if 
possible. A good example is  `RegisterApplicationMasterResponseReflector`, 
which handles the version related reflections and always return something safe 
to `YarnResourceManager`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceConstants.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+/**
+ * This class contains all constants for the external resource framework.
+ */
+public class ExternalResourceConstants {

Review comment:
       For better deduplication, we can even have a util class for generating 
the configuration keys:
   ```
   public class ExternalResourceConfigUtils {
       ;
   
       public static String genericKeyWithSuffix(String suffix) {
           return keyWithResourceNameAndSuffix("<resource_name>", suffix);
       }
   
       public static String keyWithResourceNameAndSuffix(String resourceName, 
String suffix) {
           return String.format("external-resource.%s.%s", resourceName, 
suffix);
       }
   }
   ```
   And then the config options (take amount as an example):
   ```
   key(ExternalResourceConfigUtils.keyWithResourceNameAndSuffix(AMOUNT_SUFFIX))
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceConstants.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+/**
+ * This class contains all constants for the external resource framework.
+ */
+public class ExternalResourceConstants {

Review comment:
       I would suggest to place these constants in `ExternalResourceOptions`.
   In this way, you can use these constants for assembling the configuration 
option keys. This guarantees the keys shown in the user docs are consistent 
with those actually used in runtime.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * Utility class for external resource framework.
+ */
+public class ExternalResourceUtils {
+       private static final Logger LOG = 
LoggerFactory.getLogger(ExternalResourceUtils.class);
+
+       // regex pattern to split the defined external resources
+       private static final Pattern resourceListPattern = 
Pattern.compile("\\s*,\\s*");
+
+       /**
+        * Get the enabled external resource list from configuration.
+        */
+       @VisibleForTesting
+       static final Set<String> getExternalResourceList(Configuration config) {
+               return 
resourceListPattern.splitAsStream(config.getString(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST,
 ""))
+                       .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+                       .collect(Collectors.toSet());
+       }
+
+       /**
+        * Get the config key and value of all of enabled external resources 
from the underlying external resource manager.
+        * Each external resource manager(Yarn / Kubernetes) have their own 
configuration suffix.
+        */
+       public static Map<String, Long> 
getExternalResourceConfigMap(Configuration config, String suffix) {
+               final Set<String> resourceSet = getExternalResourceList(config);
+               LOG.info("Enabled external resources: {}", resourceSet);
+
+               if (resourceSet.isEmpty()) {
+                       return Collections.emptyMap();
+               }
+
+               final Map<String, Long> externalResourceConfigs = new 
HashMap<>();
+               for (String resourceName: resourceSet) {
+                       final String configKey = 
config.getString(ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX + 
resourceName + suffix, "");
+                       final long amount = 
config.getLong(ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX + 
resourceName + ExternalResourceConstants.EXTERNAL_RESOURCE_AMOUNT_SUFFIX, 0);

Review comment:
       Deprecated methods used. We might construct local `ConfigOption` 
variables first.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * Utility class for external resource framework.
+ */
+public class ExternalResourceUtils {
+       private static final Logger LOG = 
LoggerFactory.getLogger(ExternalResourceUtils.class);
+
+       // regex pattern to split the defined external resources
+       private static final Pattern resourceListPattern = 
Pattern.compile("\\s*,\\s*");
+
+       /**
+        * Get the enabled external resource list from configuration.
+        */
+       @VisibleForTesting
+       static final Set<String> getExternalResourceList(Configuration config) {
+               return 
resourceListPattern.splitAsStream(config.getString(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST,
 ""))
+                       .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+                       .collect(Collectors.toSet());
+       }
+
+       /**
+        * Get the config key and value of all of enabled external resources 
from the underlying external resource manager.
+        * Each external resource manager(Yarn / Kubernetes) have their own 
configuration suffix.
+        */
+       public static Map<String, Long> 
getExternalResourceConfigMap(Configuration config, String suffix) {

Review comment:
       This method is very confusing. Neither the method name nor the JavaDoc 
explained that it returns a map from the deployment specific key to the amount. 
I would suggest to name it simply `getExternalResources`, with more details in 
JavaDoc explains that the returned keys should be used for deployment specific 
container request, and values should be the amount.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
##########
@@ -495,6 +502,32 @@ public void onStopContainerError(ContainerId containerId, 
Throwable throwable) {
        //  Utility methods
        // 
------------------------------------------------------------------------
 
+       private void setExternalResourceRequestIfPossible(Map<String, Long> 
externalResources) throws ResourceManagerException {
+               for (Map.Entry<String, Long> externalResource: 
externalResources.entrySet()) {
+                       if (externalResource.getValue() == 0) {
+                               continue;
+                       }
+
+                       if (!isYarnResourceTypesAvailable()) {
+                               throw new ResourceManagerException("Could not 
request extended resource because the underlying YARN does not support it.");
+                       }

Review comment:
       1. Why is this inside the loop? Shouldn't this check happens before the 
loop?
   2. Throwing the exception here will prevent RM from being constructed. It 
does not make sense to fail the resource manager when the Yarn version does not 
support external resources. I think it's good enough to log a warning message 
and simply ignore the external resources.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * Utility class for external resource framework.
+ */
+public class ExternalResourceUtils {
+       private static final Logger LOG = 
LoggerFactory.getLogger(ExternalResourceUtils.class);
+
+       // regex pattern to split the defined external resources
+       private static final Pattern resourceListPattern = 
Pattern.compile("\\s*,\\s*");
+
+       /**
+        * Get the enabled external resource list from configuration.
+        */
+       @VisibleForTesting
+       static final Set<String> getExternalResourceList(Configuration config) {
+               return 
resourceListPattern.splitAsStream(config.getString(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST,
 ""))
+                       .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+                       .collect(Collectors.toSet());
+       }
+
+       /**
+        * Get the config key and value of all of enabled external resources 
from the underlying external resource manager.
+        * Each external resource manager(Yarn / Kubernetes) have their own 
configuration suffix.
+        */
+       public static Map<String, Long> 
getExternalResourceConfigMap(Configuration config, String suffix) {

Review comment:
       Also, this class should provide deployment specific utils methods, e.g., 
`getExternalResourcesForKubernetes`, `getExternalResourcesForYarn`. This helps 
hide the suffixes from to the callers, and save us from checking the sanity of 
suffixes.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * Utility class for external resource framework.
+ */
+public class ExternalResourceUtils {
+       private static final Logger LOG = 
LoggerFactory.getLogger(ExternalResourceUtils.class);
+
+       // regex pattern to split the defined external resources
+       private static final Pattern resourceListPattern = 
Pattern.compile("\\s*,\\s*");
+
+       /**
+        * Get the enabled external resource list from configuration.
+        */
+       @VisibleForTesting
+       static final Set<String> getExternalResourceList(Configuration config) {
+               return 
resourceListPattern.splitAsStream(config.getString(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST,
 ""))
+                       .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+                       .collect(Collectors.toSet());
+       }
+
+       /**
+        * Get the config key and value of all of enabled external resources 
from the underlying external resource manager.
+        * Each external resource manager(Yarn / Kubernetes) have their own 
configuration suffix.
+        */
+       public static Map<String, Long> 
getExternalResourceConfigMap(Configuration config, String suffix) {
+               final Set<String> resourceSet = getExternalResourceList(config);
+               LOG.info("Enabled external resources: {}", resourceSet);
+
+               if (resourceSet.isEmpty()) {
+                       return Collections.emptyMap();
+               }
+
+               final Map<String, Long> externalResourceConfigs = new 
HashMap<>();
+               for (String resourceName: resourceSet) {
+                       final String configKey = 
config.getString(ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX + 
resourceName + suffix, "");
+                       final long amount = 
config.getLong(ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX + 
resourceName + ExternalResourceConstants.EXTERNAL_RESOURCE_AMOUNT_SUFFIX, 0);
+                       if (amount != 0 && 
!StringUtils.isNullOrWhitespaceOnly(configKey)) {
+                               externalResourceConfigs.put(configKey, amount);

Review comment:
       Should `configKey` be trimmed before used?

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration options for external resources and external resource drivers.
+ */
+@PublicEvolving
+public class ExternalResourceOptions {
+
+       public static final ConfigOption<String> EXTERNAL_RESOURCE_LIST =
+               key("external-resources")
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("An optional list of used external 
resources with delimiter \",\". If configured, Flink " +
+                               "would check if the relevant configs exist for 
that resource. At the ResourceManager side, Flink will forward " +
+                               "the request to the underlying external 
resource manager. At the TaskExecutor side, Flink will launch the " +
+                               "corresponding ExternalResourceDriver.");
+
+       public static final ConfigOption<String> 
EXTERNAL_RESOURCE_DRIVER_FACTORY_CLASS =

Review comment:
       Better to add `@SuppressWarnings("unused")`, and explain in JavaDoc that 
it is intentionally included into user docs while unused.
   Same for other options.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParameters.java
##########
@@ -106,6 +108,10 @@ public double getTaskManagerCPU() {
                return 
containeredTaskManagerParameters.getTaskExecutorProcessSpec().getCpuCores().getValue().doubleValue();
        }
 
+       public Map<String, Long> getTaskManagerExternalResources() {
+               return 
ExternalResourceUtils.getExternalResourceConfigMap(getFlinkConfiguration(), 
ExternalResourceConstants.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX);

Review comment:
       Should this be derived in the constructor? To avoid potential redundant 
computations.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
##########
@@ -495,6 +502,32 @@ public void onStopContainerError(ContainerId containerId, 
Throwable throwable) {
        //  Utility methods
        // 
------------------------------------------------------------------------
 
+       private void setExternalResourceRequestIfPossible(Map<String, Long> 
externalResources) throws ResourceManagerException {
+               for (Map.Entry<String, Long> externalResource: 
externalResources.entrySet()) {
+                       if (externalResource.getValue() == 0) {
+                               continue;
+                       }
+
+                       if (!isYarnResourceTypesAvailable()) {
+                               throw new ResourceManagerException("Could not 
request extended resource because the underlying YARN does not support it.");
+                       }
+
+                       try {
+                               final Class<?> resourceInfoClass = 
Class.forName(RESOURCE_INFO_CLASS);
+                               final Method setResourceInfoMethod = 
Resource.class.getMethod("setResourceInformation", String.class, 
resourceInfoClass);
+                               final Method resourceInfoNewInstance = 
resourceInfoClass.getMethod("newInstance", String.class, long.class);
+                               setResourceInfoMethod.invoke(
+                                       this.resource,
+                                       externalResource.getKey(),
+                                       resourceInfoNewInstance.invoke(null, 
externalResource.getKey(), externalResource.getValue()));
+                               log.info("Successfully request the external 
resource {} with amount {}.", externalResource.getKey(), 
externalResource.getValue());
+
+                       } catch (Exception e) {
+                               throw new ResourceManagerException("Error in 
setting the external resource.", e);
+                       }

Review comment:
       BTW, do we know which hadoop version is the external resources supported?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * Utility class for external resource framework.
+ */
+public class ExternalResourceUtils {
+       private static final Logger LOG = 
LoggerFactory.getLogger(ExternalResourceUtils.class);
+
+       // regex pattern to split the defined external resources
+       private static final Pattern resourceListPattern = 
Pattern.compile("\\s*,\\s*");
+
+       /**
+        * Get the enabled external resource list from configuration.
+        */
+       @VisibleForTesting
+       static final Set<String> getExternalResourceList(Configuration config) {
+               return 
resourceListPattern.splitAsStream(config.getString(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST,
 ""))
+                       .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+                       .collect(Collectors.toSet());

Review comment:
       If we change the type of 
`ExternalResourceOptions#EXTERNAL_RESOURCE_LIST` to 
`ConfigOption<List<String>>`, we can reuse `ConfigUtils#decodeListFromConfig` 
here.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link ExternalResourceUtils} class.
+ */
+public class ExternalResourceUtilsTest {
+
+       @Test
+       public void testGetExternalResourceList() {
+               final Configuration config = new Configuration();
+               
config.setString(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, "foo,bar");
+
+               final Set<String> resourceSet = 
ExternalResourceUtils.getExternalResourceList(config);
+
+               assertThat(resourceSet.size(), is(2));
+               assertTrue(resourceSet.contains("foo"));
+               assertTrue(resourceSet.contains("bar"));
+       }
+
+       @Test
+       public void testGetExternalResourceConfigMap() {
+               final Configuration config = new Configuration();
+               final String suffix = 
ExternalResourceConstants.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX;

Review comment:
       I would suggest to use an arbitrary suffix. 

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
##########
@@ -527,6 +560,21 @@ private FinalApplicationStatus 
getYarnStatus(ApplicationStatus status) {
                return new Tuple2<>(host, Integer.valueOf(port));
        }
 
+       /**
+        * Check whether the underlying YARN support to set extended resources.
+        * In case of Hadoop 3.1+ or 2.10+, the ResourceInformation class 
should be available on the classpath.
+        *
+        * @return Whether we could set extended resources.
+        */
+       private static boolean isYarnResourceTypesAvailable() {
+               try {
+                       Class.forName(RESOURCE_INFO_CLASS);
+                       return true;
+               } catch (ClassNotFoundException e) {
+                       return false;
+               }
+       }

Review comment:
       Same here. Let's keep the reflections in a separated class.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * Utility class for external resource framework.
+ */
+public class ExternalResourceUtils {
+       private static final Logger LOG = 
LoggerFactory.getLogger(ExternalResourceUtils.class);
+
+       // regex pattern to split the defined external resources
+       private static final Pattern resourceListPattern = 
Pattern.compile("\\s*,\\s*");
+
+       /**
+        * Get the enabled external resource list from configuration.
+        */
+       @VisibleForTesting
+       static final Set<String> getExternalResourceList(Configuration config) {
+               return 
resourceListPattern.splitAsStream(config.getString(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST,
 ""))
+                       .filter(r -> !r.isEmpty()) // splitting an empty string 
results in an empty string on jdk9+
+                       .collect(Collectors.toSet());
+       }
+
+       /**
+        * Get the config key and value of all of enabled external resources 
from the underlying external resource manager.
+        * Each external resource manager(Yarn / Kubernetes) have their own 
configuration suffix.
+        */
+       public static Map<String, Long> 
getExternalResourceConfigMap(Configuration config, String suffix) {
+               final Set<String> resourceSet = getExternalResourceList(config);
+               LOG.info("Enabled external resources: {}", resourceSet);
+
+               if (resourceSet.isEmpty()) {
+                       return Collections.emptyMap();
+               }
+
+               final Map<String, Long> externalResourceConfigs = new 
HashMap<>();
+               for (String resourceName: resourceSet) {
+                       final String configKey = 
config.getString(ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX + 
resourceName + suffix, "");
+                       final long amount = 
config.getLong(ExternalResourceConstants.EXTERNAL_RESOURCE_PREFIX + 
resourceName + ExternalResourceConstants.EXTERNAL_RESOURCE_AMOUNT_SUFFIX, 0);
+                       if (amount != 0 && 
!StringUtils.isNullOrWhitespaceOnly(configKey)) {
+                               externalResourceConfigs.put(configKey, amount);

Review comment:
       What if the key already exist? Is it possible that the user specified 
the same key for two different resources?

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java
##########
@@ -113,10 +124,12 @@ public void testMainContainerResourceRequirements() {
                final Map<String, Quantity> requests = 
resourceRequirements.getRequests();
                assertEquals(Double.toString(TASK_MANAGER_CPU), 
requests.get("cpu").getAmount());
                assertEquals(TOTAL_PROCESS_MEMORY + "Mi", 
requests.get("memory").getAmount());
+               assertEquals(Long.toString(RESOURCE_AMOUNT), 
requests.get(RESOURCE_CONFIG_KEY).getAmount());
 
                final Map<String, Quantity> limits = 
resourceRequirements.getLimits();
                assertEquals(Double.toString(TASK_MANAGER_CPU), 
limits.get("cpu").getAmount());
                assertEquals(TOTAL_PROCESS_MEMORY + "Mi", 
limits.get("memory").getAmount());
+               assertEquals(Long.toString(RESOURCE_AMOUNT), 
limits.get(RESOURCE_CONFIG_KEY).getAmount());

Review comment:
       Let's add another test case for the external resources. It would be good 
to have each test case verifies one thing only.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to