xintongsong commented on a change in pull request #11920:
URL: https://github.com/apache/flink/pull/11920#discussion_r422444245



##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUInfo.java
##########
@@ -29,12 +30,13 @@
  */
 public class GPUInfo implements ExternalResourceInfo {
 
-       private static final String PROPERTY = "index";
+       private static final String PROPERTY_KEY_INDEX = "index";
 
        private final String index;
 
        GPUInfo(String index) {
-               this.index = Preconditions.checkNotNull(index);
+               
Preconditions.checkState(!StringUtils.isNullOrWhitespaceOnly(index));

Review comment:
       nit:
   ```suggestion
                
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(index));
   ```

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java
##########
@@ -64,33 +70,37 @@
        GPUDriver(Configuration config) throws Exception {
                final String discoveryScriptPath = 
config.getString(DISCOVERY_SCRIPT_PATH);
                if (StringUtils.isNullOrWhitespaceOnly(discoveryScriptPath)) {
-                       throw new FlinkException("Could not find config of the 
path of gpu discovery script.");
+                       throw new IllegalConfigurationException(
+                               "Could not find config of the path of gpu 
discovery script, the relevant config key is {}.", 
"external-resource.<resource-name>." + DISCOVERY_SCRIPT_ARG.key());

Review comment:
       1. `{}` does not work for the constructing 
`IllegalConfigurationException`. Could use `%s` instead.
   2. We should reuse `ExternalResourceOptions#genericKeyWithSuffix` rather 
than  hard code the prefix.
   3. The error message can be more concise. I would suggest `GPU discovery 
script ('%s') is not configured.`

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java
##########
@@ -99,13 +109,26 @@ private String executeDiscoveryScript(File 
discoveryScript, long gpuAmount, Stri
                final Process process = Runtime.getRuntime().exec(cmd);
                final BufferedReader stdoutReader = new BufferedReader(new 
InputStreamReader(process.getInputStream()));
                final BufferedReader stderrReader = new BufferedReader(new 
InputStreamReader(process.getErrorStream()));
-               final int exitVal = process.waitFor();
+               final boolean exitInTime = process.waitFor(10000, 
TimeUnit.MILLISECONDS);

Review comment:
       It's probably not necessary to make this timeout configurable.
   Let's make it a static final constant, and reuse it for the below exception.

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java
##########
@@ -64,33 +70,37 @@
        GPUDriver(Configuration config) throws Exception {
                final String discoveryScriptPath = 
config.getString(DISCOVERY_SCRIPT_PATH);
                if (StringUtils.isNullOrWhitespaceOnly(discoveryScriptPath)) {
-                       throw new FlinkException("Could not find config of the 
path of gpu discovery script.");
+                       throw new IllegalConfigurationException(
+                               "Could not find config of the path of gpu 
discovery script, the relevant config key is {}.", 
"external-resource.<resource-name>." + DISCOVERY_SCRIPT_ARG.key());
                }
 
                discoveryScript = new 
File(System.getenv().getOrDefault(ConfigConstants.ENV_FLINK_HOME_DIR, ".") +
                        "/" + discoveryScriptPath);
                if (!discoveryScript.exists()) {
-                       throw new FlinkException("The gpu discovery script does 
not exist in path " + discoveryScript.getAbsolutePath());
+                       throw new FileNotFoundException("The gpu discovery 
script does not exist in path " + discoveryScript.getAbsolutePath());
+               }
+               if (!discoveryScript.canExecute()) {
+                       throw new FlinkException("The discovery script " + 
discoveryScript.getAbsolutePath() + " is not executable.");
                }
 
                args = config.getString(DISCOVERY_SCRIPT_ARG);
        }
 
        @Override
        public Set<GPUInfo> retrieveResourceInfo(long gpuAmount) throws 
Exception {
-               if (gpuAmount <= 0) {
-                       throw new FlinkException("The amount of GPU should be 
positive.");
-               }
+               Preconditions.checkArgument(gpuAmount > 0, "The gpuAmount 
should be positive while finding " + gpuAmount);
 
                final Set<GPUInfo> gpuResources = new HashSet<>();
                String output = executeDiscoveryScript(discoveryScript, 
gpuAmount, args);
-               if (output != null && !output.isEmpty()) {
-                       String[] indexes = output.split(",");
+               if (!output.isEmpty()) {
+                       String[] indexes = output.trim().split(",");

Review comment:
       We should `trim` after `split`.

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/main/resources/gpu-discovery-common.sh
##########
@@ -0,0 +1,93 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+non_coordination_allocate() {
+  indexes=($1)
+  amount=$2
+  to_occupy_indexes=(${indexes[@]:0:$amount})
+  if [ $amount -gt ${#to_occupy_indexes[@]} ]; then
+    echo "Could not get enough GPU resources."
+    exit 1
+  fi
+  echo ${to_occupy_indexes[@]} | sed 's/ /,/g'
+}
+
+coordination_allocate() {
+  indexes=$1

Review comment:
       nit:
   I would suggest to also surround this with `()`, to align with 
`non_coordination_allocate`.

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/test/resources/inexecutable-gpu-discovery.sh
##########
@@ -0,0 +1,20 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+exit 1

Review comment:
       I would avoid adding this empty script to Flink's sources.
   We can just create an arbitrary tmp file without execution permission in the 
corresponding test case. 

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/main/resources/gpu-discovery-common.sh
##########
@@ -0,0 +1,93 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+non_coordination_allocate() {
+  indexes=($1)
+  amount=$2
+  to_occupy_indexes=(${indexes[@]:0:$amount})
+  if [ $amount -gt ${#to_occupy_indexes[@]} ]; then
+    echo "Could not get enough GPU resources."
+    exit 1
+  fi
+  echo ${to_occupy_indexes[@]} | sed 's/ /,/g'
+}
+
+coordination_allocate() {
+  indexes=$1
+  amount=$2
+  coordination_file=${3:-/var/tmp/flink-gpu-coordination}
+  (
+    flock -x 200
+    # GPU indexes to be occupied. If we finally got enough indexes, index in 
this array would be recorded in the coordination file.
+    to_occupy_indexes=()
+    # GPU indexes which already recorded in the coordination file. We should 
not occupy those indexes unless there is not
+    # enough indexes.

Review comment:
       Not sure if we want to describe the variables as detailed as this.
   I think what really matter are the semantics of these variables, not how 
this function work over them.

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/test/resources/test-coordination-mode.sh
##########
@@ -34,39 +43,36 @@ test_privilege_mode() {
       fi
     done
   done
-
-  echo ${output1[1]} ${output1[0]}
-  echo ${output2[1]} ${output2[0]}
-  clean_state
 }
 
-test_check_dead() {
-  echo "0" >> /var/tmp/flink-gpu-assignment
-  IFS=',' read -r -a output1 <<< $(bash -c "$(dirname 
"$0")/testing-gpu-discovery.sh 2 --privilege --check-dead")
-  IFS=',' read -r -a output2 <<< $(bash -c "$(dirname 
"$0")/testing-gpu-discovery.sh 2 --privilege --check-dead")
+test_preempt_from_dead_processes() {
+  test_pid=$(shuf -i 1-32768 -n 1)
+  while [ $(ps -p $owner | grep -c $owner) -ne 0 ]
+  do
+    test_pid=$(shuf -i 1-32768 -n 1)
+  done

Review comment:
       `test_pid=$(shuf -i 1-32768 -n 1)` can be deduplicated with the 
following approach.
   ```
   local test_pid
   while [[ -z $test_pid || $(ps -p $owner | grep -c $owner) -ne 0 ]]
   do
       test_pid=$(shuf -i 1-32768 -n 1)
   done
   ```

##########
File path: 
flink-external-resources/flink-external-resource-gpu/src/main/resources/gpu-discovery-common.sh
##########
@@ -0,0 +1,93 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+non_coordination_allocate() {
+  indexes=($1)
+  amount=$2
+  to_occupy_indexes=(${indexes[@]:0:$amount})
+  if [ $amount -gt ${#to_occupy_indexes[@]} ]; then
+    echo "Could not get enough GPU resources."
+    exit 1
+  fi
+  echo ${to_occupy_indexes[@]} | sed 's/ /,/g'
+}
+
+coordination_allocate() {
+  indexes=$1
+  amount=$2
+  coordination_file=${3:-/var/tmp/flink-gpu-coordination}
+  (
+    flock -x 200
+    # GPU indexes to be occupied. If we finally got enough indexes, index in 
this array would be recorded in the coordination file.
+    to_occupy_indexes=()
+    # GPU indexes which already recorded in the coordination file. We should 
not occupy those indexes unless there is not
+    # enough indexes.
+    recorded_indexes=()
+    for i in $indexes
+    do
+      if [ ${#to_occupy_indexes[@]} -eq $amount ]; then
+        break
+      elif [ `grep -c "^$i " $coordination_file` -ne 0 ]; then
+        recorded_indexes[${#recorded_indexes[@]}]=$i
+      else
+        to_occupy_indexes[${#to_occupy_indexes[@]}]=$i
+      fi
+    done
+
+    if [ ${#to_occupy_indexes[@]} -ne $amount ]; then
+      for i in ${!recorded_indexes[@]}
+      do
+        if [ ${#to_occupy_indexes[@]} -eq $amount ];then
+          break
+        fi

Review comment:
       I think this can be simplified with a while-loop.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to