xintongsong commented on a change in pull request #11920:
URL: https://github.com/apache/flink/pull/11920#discussion_r421930974



##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Driver takes the responsibility to discover GPU resources and provide the 
GPU resource information.
+ * It get the GPU information by executing user-defined discovery script.
+ */
+class GPUDriver implements ExternalResourceDriver {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(GPUDriver.class);
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_PATH =
+               key("discovery-script.path")
+                       .stringType()
+                       .noDefaultValue();
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_ARG =
+               key("discovery-script.args")
+                       .stringType()
+                       .noDefaultValue();
+
+       private final File discoveryScript;
+       private final String args;
+
+       GPUDriver(Configuration config) throws Exception {
+               final String discoveryScriptPath = 
config.getString(DISCOVERY_SCRIPT_PATH);
+               if (StringUtils.isNullOrWhitespaceOnly(discoveryScriptPath)) {
+                       throw new FlinkException("Could not find config of the 
path of gpu discovery script.");
+               }
+
+               discoveryScript = new 
File(System.getenv().getOrDefault(ConfigConstants.ENV_FLINK_HOME_DIR, ".") +
+                       "/" + discoveryScriptPath);
+               if (!discoveryScript.exists()) {
+                       throw new FlinkException("The gpu discovery script does 
not exist in path " + discoveryScript.getAbsolutePath());
+               }
+
+               args = config.getString(DISCOVERY_SCRIPT_ARG);
+       }
+
+       @Override
+       public Set<GPUInfo> retrieveResourceInfo(long gpuAmount) throws 
Exception {
+               if (gpuAmount <= 0) {
+                       throw new FlinkException("The amount of GPU should be 
positive.");

Review comment:
       Here `IllegalArgumentException` might be more descriptive.
   We can just replace this check with `Preconditions.chackArgument`.

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Driver takes the responsibility to discover GPU resources and provide the 
GPU resource information.
+ * It get the GPU information by executing user-defined discovery script.
+ */
+class GPUDriver implements ExternalResourceDriver {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(GPUDriver.class);
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_PATH =
+               key("discovery-script.path")
+                       .stringType()
+                       .noDefaultValue();
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_ARG =
+               key("discovery-script.args")
+                       .stringType()
+                       .noDefaultValue();
+
+       private final File discoveryScript;
+       private final String args;
+
+       GPUDriver(Configuration config) throws Exception {
+               final String discoveryScriptPath = 
config.getString(DISCOVERY_SCRIPT_PATH);
+               if (StringUtils.isNullOrWhitespaceOnly(discoveryScriptPath)) {
+                       throw new FlinkException("Could not find config of the 
path of gpu discovery script.");
+               }
+
+               discoveryScript = new 
File(System.getenv().getOrDefault(ConfigConstants.ENV_FLINK_HOME_DIR, ".") +
+                       "/" + discoveryScriptPath);
+               if (!discoveryScript.exists()) {
+                       throw new FlinkException("The gpu discovery script does 
not exist in path " + discoveryScript.getAbsolutePath());
+               }
+
+               args = config.getString(DISCOVERY_SCRIPT_ARG);
+       }
+
+       @Override
+       public Set<GPUInfo> retrieveResourceInfo(long gpuAmount) throws 
Exception {
+               if (gpuAmount <= 0) {
+                       throw new FlinkException("The amount of GPU should be 
positive.");

Review comment:
       Also, better to log the current argument value.

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Driver takes the responsibility to discover GPU resources and provide the 
GPU resource information.
+ * It get the GPU information by executing user-defined discovery script.
+ */
+class GPUDriver implements ExternalResourceDriver {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(GPUDriver.class);
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_PATH =
+               key("discovery-script.path")
+                       .stringType()
+                       .noDefaultValue();
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_ARG =
+               key("discovery-script.args")
+                       .stringType()
+                       .noDefaultValue();
+
+       private final File discoveryScript;
+       private final String args;
+
+       GPUDriver(Configuration config) throws Exception {
+               final String discoveryScriptPath = 
config.getString(DISCOVERY_SCRIPT_PATH);
+               if (StringUtils.isNullOrWhitespaceOnly(discoveryScriptPath)) {
+                       throw new FlinkException("Could not find config of the 
path of gpu discovery script.");

Review comment:
       Also, better to point to the related configuration key in the error 
message.

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Driver takes the responsibility to discover GPU resources and provide the 
GPU resource information.
+ * It get the GPU information by executing user-defined discovery script.
+ */
+class GPUDriver implements ExternalResourceDriver {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(GPUDriver.class);
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_PATH =
+               key("discovery-script.path")
+                       .stringType()
+                       .noDefaultValue();
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_ARG =
+               key("discovery-script.args")
+                       .stringType()
+                       .noDefaultValue();
+
+       private final File discoveryScript;
+       private final String args;
+
+       GPUDriver(Configuration config) throws Exception {
+               final String discoveryScriptPath = 
config.getString(DISCOVERY_SCRIPT_PATH);
+               if (StringUtils.isNullOrWhitespaceOnly(discoveryScriptPath)) {
+                       throw new FlinkException("Could not find config of the 
path of gpu discovery script.");
+               }
+
+               discoveryScript = new 
File(System.getenv().getOrDefault(ConfigConstants.ENV_FLINK_HOME_DIR, ".") +
+                       "/" + discoveryScriptPath);
+               if (!discoveryScript.exists()) {
+                       throw new FlinkException("The gpu discovery script does 
not exist in path " + discoveryScript.getAbsolutePath());
+               }
+
+               args = config.getString(DISCOVERY_SCRIPT_ARG);
+       }
+
+       @Override
+       public Set<GPUInfo> retrieveResourceInfo(long gpuAmount) throws 
Exception {
+               if (gpuAmount <= 0) {
+                       throw new FlinkException("The amount of GPU should be 
positive.");
+               }
+
+               final Set<GPUInfo> gpuResources = new HashSet<>();
+               String output = executeDiscoveryScript(discoveryScript, 
gpuAmount, args);
+               if (output != null && !output.isEmpty()) {

Review comment:
       Method `executeDiscoveryScript` is not annotated as `Nullable`. The 
return value should be guaranteed non-null. We should move the null check to 
inside `executeDiscoveryScript`.

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Driver takes the responsibility to discover GPU resources and provide the 
GPU resource information.
+ * It get the GPU information by executing user-defined discovery script.
+ */
+class GPUDriver implements ExternalResourceDriver {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(GPUDriver.class);
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_PATH =
+               key("discovery-script.path")
+                       .stringType()
+                       .noDefaultValue();
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_ARG =
+               key("discovery-script.args")
+                       .stringType()
+                       .noDefaultValue();
+
+       private final File discoveryScript;
+       private final String args;
+
+       GPUDriver(Configuration config) throws Exception {
+               final String discoveryScriptPath = 
config.getString(DISCOVERY_SCRIPT_PATH);
+               if (StringUtils.isNullOrWhitespaceOnly(discoveryScriptPath)) {
+                       throw new FlinkException("Could not find config of the 
path of gpu discovery script.");
+               }
+
+               discoveryScript = new 
File(System.getenv().getOrDefault(ConfigConstants.ENV_FLINK_HOME_DIR, ".") +
+                       "/" + discoveryScriptPath);
+               if (!discoveryScript.exists()) {
+                       throw new FlinkException("The gpu discovery script does 
not exist in path " + discoveryScript.getAbsolutePath());
+               }
+
+               args = config.getString(DISCOVERY_SCRIPT_ARG);
+       }
+
+       @Override
+       public Set<GPUInfo> retrieveResourceInfo(long gpuAmount) throws 
Exception {
+               if (gpuAmount <= 0) {
+                       throw new FlinkException("The amount of GPU should be 
positive.");
+               }
+
+               final Set<GPUInfo> gpuResources = new HashSet<>();
+               String output = executeDiscoveryScript(discoveryScript, 
gpuAmount, args);
+               if (output != null && !output.isEmpty()) {
+                       String[] indexes = output.split(",");
+                       for (String index : indexes) {
+                               gpuResources.add(new GPUInfo(index));

Review comment:
       I think we should check the `index` before creating `GPUInfo`. We should 
ignore empty or whitespace-only strings. It's probably also make sense to 
`trim` the string.

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Driver takes the responsibility to discover GPU resources and provide the 
GPU resource information.
+ * It get the GPU information by executing user-defined discovery script.
+ */
+class GPUDriver implements ExternalResourceDriver {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(GPUDriver.class);
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_PATH =
+               key("discovery-script.path")
+                       .stringType()
+                       .noDefaultValue();
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_ARG =
+               key("discovery-script.args")
+                       .stringType()
+                       .noDefaultValue();
+
+       private final File discoveryScript;
+       private final String args;
+
+       GPUDriver(Configuration config) throws Exception {
+               final String discoveryScriptPath = 
config.getString(DISCOVERY_SCRIPT_PATH);
+               if (StringUtils.isNullOrWhitespaceOnly(discoveryScriptPath)) {
+                       throw new FlinkException("Could not find config of the 
path of gpu discovery script.");
+               }
+
+               discoveryScript = new 
File(System.getenv().getOrDefault(ConfigConstants.ENV_FLINK_HOME_DIR, ".") +
+                       "/" + discoveryScriptPath);
+               if (!discoveryScript.exists()) {
+                       throw new FlinkException("The gpu discovery script does 
not exist in path " + discoveryScript.getAbsolutePath());
+               }
+
+               args = config.getString(DISCOVERY_SCRIPT_ARG);
+       }
+
+       @Override
+       public Set<GPUInfo> retrieveResourceInfo(long gpuAmount) throws 
Exception {
+               if (gpuAmount <= 0) {
+                       throw new FlinkException("The amount of GPU should be 
positive.");
+               }
+
+               final Set<GPUInfo> gpuResources = new HashSet<>();
+               String output = executeDiscoveryScript(discoveryScript, 
gpuAmount, args);
+               if (output != null && !output.isEmpty()) {
+                       String[] indexes = output.split(",");
+                       for (String index : indexes) {
+                               gpuResources.add(new GPUInfo(index));
+                       }
+               }
+               LOG.info("Discover the GPU resource {}.", gpuResources);

Review comment:
       ```suggestion
                LOG.info("Discovered GPU resources: {}.", gpuResources);
   ```

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Driver takes the responsibility to discover GPU resources and provide the 
GPU resource information.
+ * It get the GPU information by executing user-defined discovery script.
+ */
+class GPUDriver implements ExternalResourceDriver {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(GPUDriver.class);
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_PATH =
+               key("discovery-script.path")
+                       .stringType()
+                       .noDefaultValue();
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_ARG =
+               key("discovery-script.args")
+                       .stringType()
+                       .noDefaultValue();
+
+       private final File discoveryScript;
+       private final String args;
+
+       GPUDriver(Configuration config) throws Exception {
+               final String discoveryScriptPath = 
config.getString(DISCOVERY_SCRIPT_PATH);
+               if (StringUtils.isNullOrWhitespaceOnly(discoveryScriptPath)) {
+                       throw new FlinkException("Could not find config of the 
path of gpu discovery script.");
+               }
+
+               discoveryScript = new 
File(System.getenv().getOrDefault(ConfigConstants.ENV_FLINK_HOME_DIR, ".") +
+                       "/" + discoveryScriptPath);
+               if (!discoveryScript.exists()) {
+                       throw new FlinkException("The gpu discovery script does 
not exist in path " + discoveryScript.getAbsolutePath());
+               }
+
+               args = config.getString(DISCOVERY_SCRIPT_ARG);
+       }
+
+       @Override
+       public Set<GPUInfo> retrieveResourceInfo(long gpuAmount) throws 
Exception {
+               if (gpuAmount <= 0) {
+                       throw new FlinkException("The amount of GPU should be 
positive.");
+               }
+
+               final Set<GPUInfo> gpuResources = new HashSet<>();
+               String output = executeDiscoveryScript(discoveryScript, 
gpuAmount, args);
+               if (output != null && !output.isEmpty()) {
+                       String[] indexes = output.split(",");
+                       for (String index : indexes) {
+                               gpuResources.add(new GPUInfo(index));
+                       }
+               }
+               LOG.info("Discover the GPU resource {}.", gpuResources);
+               return Collections.unmodifiableSet(gpuResources);
+       }
+
+       private String executeDiscoveryScript(File discoveryScript, long 
gpuAmount, String args) throws Exception {
+               final String cmd = discoveryScript.getAbsolutePath() + " " + 
gpuAmount + " " + args;
+               final Process process = Runtime.getRuntime().exec(cmd);
+               final BufferedReader stdoutReader = new BufferedReader(new 
InputStreamReader(process.getInputStream()));
+               final BufferedReader stderrReader = new BufferedReader(new 
InputStreamReader(process.getErrorStream()));
+               final int exitVal = process.waitFor();

Review comment:
       This blocks the thread.
   
   I noticed that this method is indirectly called by 
`ExternalResourceUtils#getExternalResourceInfoInternal`, in a loop. Ideally, 
this should be executed concurrently among all the drivers, and asynchronously 
if needed.
   
   Since we don't have many drivers ATM, and the GPU discovery script does not 
take much time as we manually tested, I'm fine with making this a follow-up 
issue.
   
   For this PR, I think we should at least have a timeout for `waitFor`, in 
case the script is stuck or takes too long.

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Driver takes the responsibility to discover GPU resources and provide the 
GPU resource information.
+ * It get the GPU information by executing user-defined discovery script.
+ */
+class GPUDriver implements ExternalResourceDriver {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(GPUDriver.class);
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_PATH =
+               key("discovery-script.path")
+                       .stringType()
+                       .noDefaultValue();
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_ARG =
+               key("discovery-script.args")
+                       .stringType()
+                       .noDefaultValue();
+
+       private final File discoveryScript;
+       private final String args;
+
+       GPUDriver(Configuration config) throws Exception {
+               final String discoveryScriptPath = 
config.getString(DISCOVERY_SCRIPT_PATH);
+               if (StringUtils.isNullOrWhitespaceOnly(discoveryScriptPath)) {
+                       throw new FlinkException("Could not find config of the 
path of gpu discovery script.");
+               }
+
+               discoveryScript = new 
File(System.getenv().getOrDefault(ConfigConstants.ENV_FLINK_HOME_DIR, ".") +
+                       "/" + discoveryScriptPath);
+               if (!discoveryScript.exists()) {
+                       throw new FlinkException("The gpu discovery script does 
not exist in path " + discoveryScript.getAbsolutePath());
+               }
+
+               args = config.getString(DISCOVERY_SCRIPT_ARG);
+       }
+
+       @Override
+       public Set<GPUInfo> retrieveResourceInfo(long gpuAmount) throws 
Exception {
+               if (gpuAmount <= 0) {
+                       throw new FlinkException("The amount of GPU should be 
positive.");
+               }
+
+               final Set<GPUInfo> gpuResources = new HashSet<>();
+               String output = executeDiscoveryScript(discoveryScript, 
gpuAmount, args);
+               if (output != null && !output.isEmpty()) {
+                       String[] indexes = output.split(",");
+                       for (String index : indexes) {
+                               gpuResources.add(new GPUInfo(index));
+                       }
+               }
+               LOG.info("Discover the GPU resource {}.", gpuResources);
+               return Collections.unmodifiableSet(gpuResources);
+       }
+
+       private String executeDiscoveryScript(File discoveryScript, long 
gpuAmount, String args) throws Exception {
+               final String cmd = discoveryScript.getAbsolutePath() + " " + 
gpuAmount + " " + args;
+               final Process process = Runtime.getRuntime().exec(cmd);
+               final BufferedReader stdoutReader = new BufferedReader(new 
InputStreamReader(process.getInputStream()));
+               final BufferedReader stderrReader = new BufferedReader(new 
InputStreamReader(process.getErrorStream()));
+               final int exitVal = process.waitFor();
+               if (exitVal != 0) {
+                       final String stdout = 
stdoutReader.lines().collect(StringBuilder::new, StringBuilder::append, 
StringBuilder::append).toString();
+                       final String stderr = 
stderrReader.lines().collect(StringBuilder::new, StringBuilder::append, 
StringBuilder::append).toString();
+                       LOG.error("Discovery script exit with {}. With output 
{} {}", exitVal, stdout, stderr);
+                       throw new FlinkException("Discovery script exit with 
non-zero.");
+               }
+               return stdoutReader.readLine();

Review comment:
       We should probably check whether the script outputs only one line, is 
this is an explicit contract.

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/main/resources/gpu-discovery-util.sh
##########
@@ -0,0 +1,81 @@
+#!/usr/bin/env bash

Review comment:
       I would suggest to name this `gpu-common.sh` or 
`gpu-discovery-common.sh`.

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/main/resources/gpu-discovery-util.sh
##########
@@ -0,0 +1,81 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+non_previlege_allocate() {

Review comment:
       ```suggestion
   non_privilege_allocate() {
   ```

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/main/resources/gpu-discovery-util.sh
##########
@@ -0,0 +1,81 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+non_previlege_allocate() {
+  indexes=$1
+  occupy_indexes=()
+  for i in $indexes
+  do
+    if [ ${#occupy_indexes[@]} -ge $AMOUNT ]; then

Review comment:
       Same fo `ASSIGNMENT_FILE` & `CHECK_DEAD_PROCESS` below.

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/main/resources/gpu-discovery-util.sh
##########
@@ -0,0 +1,81 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+non_previlege_allocate() {
+  indexes=$1
+  occupy_indexes=()
+  for i in $indexes
+  do
+    if [ ${#occupy_indexes[@]} -ge $AMOUNT ]; then

Review comment:
       IIUC, the purpose of this script is to provide common util functions. 
Then, we should not depend on global variables declared in another script that 
uses the functions. I would suggest to make `AMOUNT` an argument of the 
function.

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/main/resources/gpu-discovery-util.sh
##########
@@ -0,0 +1,81 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+non_previlege_allocate() {
+  indexes=$1
+  occupy_indexes=()
+  for i in $indexes
+  do
+    if [ ${#occupy_indexes[@]} -ge $AMOUNT ]; then
+      break
+    fi
+    occupy_indexes[${#occupy_indexes[@]}]=$i
+  done

Review comment:
       I think this can be simplified with the following.
   ```
   occupy_indexes=(${indexes[@]:0:$AMOUNT})
   ```

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/main/resources/gpu-discovery-util.sh
##########
@@ -0,0 +1,81 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+non_previlege_allocate() {
+  indexes=$1
+  occupy_indexes=()
+  for i in $indexes
+  do
+    if [ ${#occupy_indexes[@]} -ge $AMOUNT ]; then
+      break
+    fi
+    occupy_indexes[${#occupy_indexes[@]}]=$i
+  done
+  if [ $AMOUNT -gt ${#occupy_indexes[@]} ]; then
+    echo "Could not get enough GPU resources."
+    exit 1
+  fi
+  echo ${occupy_indexes[@]} | sed 's/ /,/g'
+}
+
+privilege_allocate() {
+  indexes=$1
+  (
+    flock -x 200
+
+    occupy_indexes=()
+    deny_indexes=()
+    for i in $indexes
+    do
+      if [ ${#occupy_indexes[@]} -eq $AMOUNT ] || [ `grep -c "^$i " 
$ASSIGNMENT_FILE` -ne 0 ]; then
+        deny_indexes[${#deny_indexes[@]}]=$i
+      else
+        occupy_indexes[${#occupy_indexes[@]}]=$i
+      fi
+    done
+
+    if [ "$CHECK_DEAD_PROCESS" == "check" ]; then

Review comment:
       I would suggest to also add `${#occupy_indexes[@]} -ne $AMOUNT` to this 
if condition.
   This improves the readability by aligning with the intuition that we only 
need to check for dead processes if there are not enough available indexes.

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/main/resources/gpu-discovery-util.sh
##########
@@ -0,0 +1,81 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+non_previlege_allocate() {
+  indexes=$1
+  occupy_indexes=()
+  for i in $indexes
+  do
+    if [ ${#occupy_indexes[@]} -ge $AMOUNT ]; then
+      break
+    fi
+    occupy_indexes[${#occupy_indexes[@]}]=$i
+  done
+  if [ $AMOUNT -gt ${#occupy_indexes[@]} ]; then
+    echo "Could not get enough GPU resources."
+    exit 1
+  fi
+  echo ${occupy_indexes[@]} | sed 's/ /,/g'
+}
+
+privilege_allocate() {
+  indexes=$1
+  (
+    flock -x 200
+
+    occupy_indexes=()
+    deny_indexes=()
+    for i in $indexes
+    do
+      if [ ${#occupy_indexes[@]} -eq $AMOUNT ] || [ `grep -c "^$i " 
$ASSIGNMENT_FILE` -ne 0 ]; then
+        deny_indexes[${#deny_indexes[@]}]=$i
+      else
+        occupy_indexes[${#occupy_indexes[@]}]=$i
+      fi
+    done

Review comment:
       We don't need to loop through all the indexes. We can break the loop 
once we have enough occupy-able indexes.

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/main/resources/nvidia-gpu-discovery.sh
##########
@@ -0,0 +1,69 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/gpu-discovery-util.sh
+
+if [ $# -lt 1 ]; then
+  echo "Usage: ./nvidia-gpu-discovery.sh gpu-amount [--privilege] 
[--check-dead] [--assign-file filePath]"

Review comment:
       Moreover, could you remind me why do we need the option `--check-dead`? 
Or ask in another way, can we remove this option and just always check for the 
gpu occupied by the dead processes?

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/main/resources/nvidia-gpu-discovery.sh
##########
@@ -0,0 +1,69 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/gpu-discovery-util.sh
+
+if [ $# -lt 1 ]; then
+  echo "Usage: ./nvidia-gpu-discovery.sh gpu-amount [--privilege] 
[--check-dead] [--assign-file filePath]"

Review comment:
       I still think the option names are not very descriptive. It might 
because that the term "privilege mode" itself is not very straightforward.
   
   For the record, the following are the names we came up with in offline 
discussion.
   ```
   --enable-coordination-mode
   --preempt-from-dead-processes
   --coordination-file
   ```
   
   I would also suggest to look for suggestions from people experts in English 
for concise and clear phrases.

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/main/resources/gpu-discovery-util.sh
##########
@@ -0,0 +1,81 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+non_previlege_allocate() {
+  indexes=$1
+  occupy_indexes=()
+  for i in $indexes
+  do
+    if [ ${#occupy_indexes[@]} -ge $AMOUNT ]; then
+      break
+    fi
+    occupy_indexes[${#occupy_indexes[@]}]=$i
+  done
+  if [ $AMOUNT -gt ${#occupy_indexes[@]} ]; then
+    echo "Could not get enough GPU resources."
+    exit 1
+  fi
+  echo ${occupy_indexes[@]} | sed 's/ /,/g'
+}
+
+privilege_allocate() {
+  indexes=$1
+  (
+    flock -x 200
+
+    occupy_indexes=()
+    deny_indexes=()

Review comment:
       The names are not very descriptive.
   I would suggest `to_occupy_indexes` and `unavailable_indexes`.
   Maybe also add some comments to explain the semantics.

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/test/resources/testing-gpu-discovery.sh
##########
@@ -0,0 +1,71 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/../../main/resources/gpu-discovery-util.sh
+
+if [ $# -lt 1 ]; then
+  echo "Usage: ./testing-gpu-discovery.sh gpu-amount [--privilege] 
[--check-dead] [--assign-file filePath] [exit-non-zero]"
+  exit 1
+fi
+
+AMOUNT=$1
+shift
+ASSIGNMENT_FILE="/var/tmp/flink-gpu-assignment"
+PRIVILEGE_OPTION=""
+CHECK_DEAD_PROCESS=""
+EXIT_NON_ZERO=""
+
+while [[ $# -ge 1 ]]
+do
+key="$1"
+  case $key in
+    --privilege)
+    PRIVILEGE_OPTION="privilege"
+    ;;
+    --check-dead)
+    CHECK_DEAD_PROCESS="check"
+    ;;
+    --assign-file)
+    ASSIGNMENT_FILE="$2"
+    shift
+    ;;
+    --exit-non-zero)
+    EXIT_NON_ZERO="exit-non-zero"
+    ;;
+    *)
+    # unknown option
+    ;;
+  esac
+  shift
+done
+
+if [ $AMOUNT -eq 0 ]; then
+  exit 0
+fi
+
+if [ "$EXIT_NON_ZERO" == "exit-non-zero" ]; then
+  exit 1
+fi
+
+indexes=($(seq 0 1 3))
+if [ "$PRIVILEGE_OPTION" == "privilege" ]; then
+  privilege_allocate "${indexes[*]}"
+else
+  non_previlege_allocate "${indexes[*]}"
+fi

Review comment:
       Again, this logic should be moved to `gpu-discovery-util.sh`.

##########
File path: flink-external-resources/flink-external-resource-gpu/pom.xml
##########
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <artifactId>flink-external-resources</artifactId>
+               <groupId>org.apache.flink</groupId>
+               <version>1.11-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-external-resource-gpu</artifactId>
+       <name>flink-external-resource-gpu</name>
+
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils-junit</artifactId>
+                       <version>${project.version}</version>
+               </dependency>

Review comment:
       Should this be test scope?

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/test/resources/test-privilege-allocate.sh
##########
@@ -0,0 +1,72 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+test_privilege_mode() {
+  IFS=',' read -r -a output1 <<< $(bash -c "$(dirname 
"$0")/testing-gpu-discovery.sh 2 --privilege")
+  IFS=',' read -r -a output2 <<< $(bash -c "$(dirname 
"$0")/testing-gpu-discovery.sh 2 --privilege")
+
+  if [[ ${#output1[@]} -ne 2 || ${#output2[@]} -ne 2 ]]; then
+    exit 1
+  fi
+
+  for i in output1
+  do
+    for j in output2
+    do
+      if [ $i == $j ]; then
+        exit 1
+      fi
+    done
+  done
+
+  echo ${output1[1]} ${output1[0]}
+  echo ${output2[1]} ${output2[0]}
+  clean_state
+}
+
+test_check_dead() {
+  echo "0" >> /var/tmp/flink-gpu-assignment
+  IFS=',' read -r -a output1 <<< $(bash -c "$(dirname 
"$0")/testing-gpu-discovery.sh 2 --privilege --check-dead")
+  IFS=',' read -r -a output2 <<< $(bash -c "$(dirname 
"$0")/testing-gpu-discovery.sh 2 --privilege --check-dead")
+
+  if [[ ${#output1[@]} -ne 2 || ${#output2[@]} -ne 2 ]]; then
+    exit 1
+  fi
+
+  clean_state
+}
+
+test_assign_file() {
+  assign_file=/var/tmp/flink-test-assignment
+  $(dirname "$0")/testing-gpu-discovery.sh 2 --privilege --assign-file 
$assign_file
+
+  if ![ -f $assign_file ]; then
+    clean_state
+    exit 1;
+  fi
+
+  clean_state
+}
+
+clean_state() {
+  rm -f /var/tmp/flink-gpu-assignment
+  rm -f /var/tmp/flink-test-assignment
+}
+
+$@

Review comment:
       I assume this is to execute the testing functions specified by the 
arguments?
   For better readability, let's comment the usage for this script in the 
begining.

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/test/resources/test-privilege-allocate.sh
##########
@@ -0,0 +1,72 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+test_privilege_mode() {

Review comment:
       Test case for non-privilege mode is missing.

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/main/resources/nvidia-gpu-discovery.sh
##########
@@ -0,0 +1,69 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/gpu-discovery-util.sh
+
+if [ $# -lt 1 ]; then
+  echo "Usage: ./nvidia-gpu-discovery.sh gpu-amount [--privilege] 
[--check-dead] [--assign-file filePath]"
+  exit 1
+fi
+
+
+AMOUNT=$1
+shift
+ASSIGNMENT_FILE="/var/tmp/flink-gpu-assignment"
+PRIVILEGE_OPTION=""
+CHECK_DEAD_PROCESS=""
+
+while [[ $# -ge 1 ]]
+do
+key="$1"
+  case $key in
+    --privilege)
+    PRIVILEGE_OPTION="privilege"
+    ;;
+    --check-dead)
+    CHECK_DEAD_PROCESS="check"
+    ;;
+    --assign-file)
+    ASSIGNMENT_FILE="$2"
+    shift
+    ;;
+    *)
+    # unknown option
+    ;;
+  esac
+  shift
+done
+
+if [ $AMOUNT -eq 0 ]; then
+  exit 0
+fi
+
+csv_index=`nvidia-smi --query-gpu=index --format=csv,noheader`
+if [ $? -ne 0 ]; then
+  exit 1
+fi
+IFS=',' read -r -a indexes <<< $(echo $csv_index)
+
+if [ "$PRIVILEGE_OPTION" == "privilege" ]; then
+  privilege_allocate "${indexes[*]}"
+else
+  non_previlege_allocate "${indexes[*]}"
+fi

Review comment:
       I think this logic should belong to `gpu-discovery-util.sh`. In this way 
we can cover this logic with our test cases.

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/test/resources/testing-gpu-discovery.sh
##########
@@ -0,0 +1,71 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/../../main/resources/gpu-discovery-util.sh
+
+if [ $# -lt 1 ]; then
+  echo "Usage: ./testing-gpu-discovery.sh gpu-amount [--privilege] 
[--check-dead] [--assign-file filePath] [exit-non-zero]"
+  exit 1
+fi
+
+AMOUNT=$1
+shift
+ASSIGNMENT_FILE="/var/tmp/flink-gpu-assignment"
+PRIVILEGE_OPTION=""
+CHECK_DEAD_PROCESS=""
+EXIT_NON_ZERO=""
+
+while [[ $# -ge 1 ]]
+do
+key="$1"
+  case $key in
+    --privilege)
+    PRIVILEGE_OPTION="privilege"
+    ;;
+    --check-dead)
+    CHECK_DEAD_PROCESS="check"
+    ;;
+    --assign-file)
+    ASSIGNMENT_FILE="$2"
+    shift
+    ;;
+    --exit-non-zero)
+    EXIT_NON_ZERO="exit-non-zero"
+    ;;
+    *)
+    # unknown option
+    ;;
+  esac
+  shift
+done
+
+if [ $AMOUNT -eq 0 ]; then
+  exit 0
+fi
+
+if [ "$EXIT_NON_ZERO" == "exit-non-zero" ]; then
+  exit 1
+fi
+
+indexes=($(seq 0 1 3))

Review comment:
       I would suggest to either make the total available gpu amount defined by 
argument, or document in the beginning that this script always pretend having 4 
available gpus.
   I personally prefer the argument approach.

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/test/resources/test-privilege-allocate.sh
##########
@@ -0,0 +1,72 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+test_privilege_mode() {
+  IFS=',' read -r -a output1 <<< $(bash -c "$(dirname 
"$0")/testing-gpu-discovery.sh 2 --privilege")
+  IFS=',' read -r -a output2 <<< $(bash -c "$(dirname 
"$0")/testing-gpu-discovery.sh 2 --privilege")
+
+  if [[ ${#output1[@]} -ne 2 || ${#output2[@]} -ne 2 ]]; then
+    exit 1
+  fi
+
+  for i in output1
+  do
+    for j in output2
+    do
+      if [ $i == $j ]; then
+        exit 1
+      fi
+    done
+  done
+
+  echo ${output1[1]} ${output1[0]}
+  echo ${output2[1]} ${output2[0]}
+  clean_state
+}
+
+test_check_dead() {
+  echo "0" >> /var/tmp/flink-gpu-assignment

Review comment:
       I think we need to write a pid there.

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/main/resources/nvidia-gpu-discovery.sh
##########
@@ -0,0 +1,69 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/gpu-discovery-util.sh
+
+if [ $# -lt 1 ]; then
+  echo "Usage: ./nvidia-gpu-discovery.sh gpu-amount [--privilege] 
[--check-dead] [--assign-file filePath]"
+  exit 1
+fi
+
+
+AMOUNT=$1
+shift
+ASSIGNMENT_FILE="/var/tmp/flink-gpu-assignment"
+PRIVILEGE_OPTION=""
+CHECK_DEAD_PROCESS=""
+
+while [[ $# -ge 1 ]]
+do
+key="$1"
+  case $key in
+    --privilege)
+    PRIVILEGE_OPTION="privilege"
+    ;;
+    --check-dead)
+    CHECK_DEAD_PROCESS="check"
+    ;;
+    --assign-file)
+    ASSIGNMENT_FILE="$2"
+    shift
+    ;;
+    *)
+    # unknown option
+    ;;
+  esac
+  shift

Review comment:
       I would suggest to move this `shift` to right after reading`$key` fro 
the argument.
   This means also change `$2` to `$1` for the `--assign-file` case.

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/test/resources/testing-gpu-discovery.sh
##########
@@ -0,0 +1,71 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/../../main/resources/gpu-discovery-util.sh
+
+if [ $# -lt 1 ]; then
+  echo "Usage: ./testing-gpu-discovery.sh gpu-amount [--privilege] 
[--check-dead] [--assign-file filePath] [exit-non-zero]"
+  exit 1
+fi
+
+AMOUNT=$1
+shift
+ASSIGNMENT_FILE="/var/tmp/flink-gpu-assignment"
+PRIVILEGE_OPTION=""
+CHECK_DEAD_PROCESS=""
+EXIT_NON_ZERO=""
+
+while [[ $# -ge 1 ]]
+do
+key="$1"
+  case $key in
+    --privilege)
+    PRIVILEGE_OPTION="privilege"
+    ;;
+    --check-dead)
+    CHECK_DEAD_PROCESS="check"
+    ;;
+    --assign-file)
+    ASSIGNMENT_FILE="$2"
+    shift
+    ;;
+    --exit-non-zero)
+    EXIT_NON_ZERO="exit-non-zero"
+    ;;
+    *)
+    # unknown option
+    ;;
+  esac
+  shift
+done
+
+if [ $AMOUNT -eq 0 ]; then
+  exit 0
+fi
+
+if [ "$EXIT_NON_ZERO" == "exit-non-zero" ]; then
+  exit 1
+fi

Review comment:
       This should be moved before the amount zero check.
   Otherwise, we might exit with 0 while `exit-non-zero` is specified.

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/test/java/org/apache/flink/externalresource/gpu/GPUDriverTest.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.Set;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests for {@link GPUDriver}.
+ */
+public class GPUDriverTest extends TestLogger {
+
+       private static final String TESTING_DISCOVERY_SCRIPT_PATH = 
"src/test/resources/testing-gpu-discovery.sh";
+       private static final String TEST_SCRIPT_PATH = 
"src/test/resources/test-privilege-allocate.sh";
+
+       @Test
+       public void testGPUDriverWithTestScript() throws Exception {
+               final int gpuAmount = 2;
+               final Configuration config = new Configuration();
+               config.setString(GPUDriver.DISCOVERY_SCRIPT_PATH, 
TESTING_DISCOVERY_SCRIPT_PATH);
+
+               final GPUDriver gpuDriver = new GPUDriver(config);
+               final Set<GPUInfo> gpuResource = 
gpuDriver.retrieveResourceInfo(gpuAmount);
+
+               assertThat(gpuResource.size(), is(gpuAmount));
+       }
+
+       @Test
+       public void testPrivilegeMode() throws Exception {
+               assumeTrue(OperatingSystem.isLinux());
+               testExistWithNonZero("test_privilege_mode");
+       }
+
+       @Test
+       public void testCheckDead() throws Exception {
+               assumeTrue(OperatingSystem.isLinux());
+               testExistWithNonZero("test_check_dead");
+       }
+
+       @Test
+       public void testAssignFile() throws Exception {
+               assumeTrue(OperatingSystem.isLinux());
+               testExistWithNonZero("test_assign_file");
+       }
+
+       private void testExistWithNonZero(String cmd) throws Exception {
+               final ProcessBuilder processBuilder = new 
ProcessBuilder(TEST_SCRIPT_PATH, cmd);
+               processBuilder.redirectErrorStream(true);
+               final Process process = processBuilder.start();
+               final int exitValue = process.waitFor();
+               if (exitValue != 0) {
+                       final BufferedReader stdoutReader = new 
BufferedReader(new InputStreamReader(process.getInputStream()));
+                       final String stdout = 
stdoutReader.lines().collect(StringBuilder::new, StringBuilder::append, 
StringBuilder::append).toString();
+                       log.error("Script exit with non-zero {}, output: {}", 
exitValue, stdout);
+                       throw new Exception("Script exist with non-zero.");
+               }
+       }

Review comment:
       These should belong to another file, with the name 
`GpuDiscoveryScriptTest` or something similar.
   These cases are not testing the behavior of `GPUDriver`, but the behavior of 
`gpu-discovery-util.sh`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to