xintongsong commented on a change in pull request #11920:
URL: https://github.com/apache/flink/pull/11920#discussion_r422444245
##########
File path:
flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUInfo.java
##########
@@ -29,12 +30,13 @@
*/
public class GPUInfo implements ExternalResourceInfo {
- private static final String PROPERTY = "index";
+ private static final String PROPERTY_KEY_INDEX = "index";
private final String index;
GPUInfo(String index) {
- this.index = Preconditions.checkNotNull(index);
+
Preconditions.checkState(!StringUtils.isNullOrWhitespaceOnly(index));
Review comment:
nit:
```suggestion
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(index));
```
##########
File path:
flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java
##########
@@ -64,33 +70,37 @@
GPUDriver(Configuration config) throws Exception {
final String discoveryScriptPath =
config.getString(DISCOVERY_SCRIPT_PATH);
if (StringUtils.isNullOrWhitespaceOnly(discoveryScriptPath)) {
- throw new FlinkException("Could not find config of the
path of gpu discovery script.");
+ throw new IllegalConfigurationException(
+ "Could not find config of the path of gpu
discovery script, the relevant config key is {}.",
"external-resource.<resource-name>." + DISCOVERY_SCRIPT_ARG.key());
Review comment:
1. `{}` does not work for the constructing
`IllegalConfigurationException`. Could use `%s` instead.
2. We should reuse `ExternalResourceOptions#genericKeyWithSuffix` rather
than hard code the prefix.
3. The error message can be more concise. I would suggest `GPU discovery
script ('%s') is not configured.`
##########
File path:
flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java
##########
@@ -99,13 +109,26 @@ private String executeDiscoveryScript(File
discoveryScript, long gpuAmount, Stri
final Process process = Runtime.getRuntime().exec(cmd);
final BufferedReader stdoutReader = new BufferedReader(new
InputStreamReader(process.getInputStream()));
final BufferedReader stderrReader = new BufferedReader(new
InputStreamReader(process.getErrorStream()));
- final int exitVal = process.waitFor();
+ final boolean exitInTime = process.waitFor(10000,
TimeUnit.MILLISECONDS);
Review comment:
It's probably not necessary to make this timeout configurable.
Let's make it a static final constant, and reuse it for the below exception.
##########
File path:
flink-external-resources/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java
##########
@@ -64,33 +70,37 @@
GPUDriver(Configuration config) throws Exception {
final String discoveryScriptPath =
config.getString(DISCOVERY_SCRIPT_PATH);
if (StringUtils.isNullOrWhitespaceOnly(discoveryScriptPath)) {
- throw new FlinkException("Could not find config of the
path of gpu discovery script.");
+ throw new IllegalConfigurationException(
+ "Could not find config of the path of gpu
discovery script, the relevant config key is {}.",
"external-resource.<resource-name>." + DISCOVERY_SCRIPT_ARG.key());
}
discoveryScript = new
File(System.getenv().getOrDefault(ConfigConstants.ENV_FLINK_HOME_DIR, ".") +
"/" + discoveryScriptPath);
if (!discoveryScript.exists()) {
- throw new FlinkException("The gpu discovery script does
not exist in path " + discoveryScript.getAbsolutePath());
+ throw new FileNotFoundException("The gpu discovery
script does not exist in path " + discoveryScript.getAbsolutePath());
+ }
+ if (!discoveryScript.canExecute()) {
+ throw new FlinkException("The discovery script " +
discoveryScript.getAbsolutePath() + " is not executable.");
}
args = config.getString(DISCOVERY_SCRIPT_ARG);
}
@Override
public Set<GPUInfo> retrieveResourceInfo(long gpuAmount) throws
Exception {
- if (gpuAmount <= 0) {
- throw new FlinkException("The amount of GPU should be
positive.");
- }
+ Preconditions.checkArgument(gpuAmount > 0, "The gpuAmount
should be positive while finding " + gpuAmount);
final Set<GPUInfo> gpuResources = new HashSet<>();
String output = executeDiscoveryScript(discoveryScript,
gpuAmount, args);
- if (output != null && !output.isEmpty()) {
- String[] indexes = output.split(",");
+ if (!output.isEmpty()) {
+ String[] indexes = output.trim().split(",");
Review comment:
We should `trim` after `split`.
##########
File path:
flink-external-resources/flink-external-resource-gpu/src/main/resources/gpu-discovery-common.sh
##########
@@ -0,0 +1,93 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+non_coordination_allocate() {
+ indexes=($1)
+ amount=$2
+ to_occupy_indexes=(${indexes[@]:0:$amount})
+ if [ $amount -gt ${#to_occupy_indexes[@]} ]; then
+ echo "Could not get enough GPU resources."
+ exit 1
+ fi
+ echo ${to_occupy_indexes[@]} | sed 's/ /,/g'
+}
+
+coordination_allocate() {
+ indexes=$1
Review comment:
nit:
I would suggest to also surround this with `()`, to align with
`non_coordination_allocate`.
##########
File path:
flink-external-resources/flink-external-resource-gpu/src/test/resources/inexecutable-gpu-discovery.sh
##########
@@ -0,0 +1,20 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+exit 1
Review comment:
I would avoid adding this empty script to Flink's sources.
We can just create an arbitrary tmp file without execution permission in the
corresponding test case.
##########
File path:
flink-external-resources/flink-external-resource-gpu/src/main/resources/gpu-discovery-common.sh
##########
@@ -0,0 +1,93 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+non_coordination_allocate() {
+ indexes=($1)
+ amount=$2
+ to_occupy_indexes=(${indexes[@]:0:$amount})
+ if [ $amount -gt ${#to_occupy_indexes[@]} ]; then
+ echo "Could not get enough GPU resources."
+ exit 1
+ fi
+ echo ${to_occupy_indexes[@]} | sed 's/ /,/g'
+}
+
+coordination_allocate() {
+ indexes=$1
+ amount=$2
+ coordination_file=${3:-/var/tmp/flink-gpu-coordination}
+ (
+ flock -x 200
+ # GPU indexes to be occupied. If we finally got enough indexes, index in
this array would be recorded in the coordination file.
+ to_occupy_indexes=()
+ # GPU indexes which already recorded in the coordination file. We should
not occupy those indexes unless there is not
+ # enough indexes.
Review comment:
Not sure if we want to describe the variables as detailed as this.
I think what really matter are the semantics of these variables, not how
this function work over them.
##########
File path:
flink-external-resources/flink-external-resource-gpu/src/test/resources/test-coordination-mode.sh
##########
@@ -34,39 +43,36 @@ test_privilege_mode() {
fi
done
done
-
- echo ${output1[1]} ${output1[0]}
- echo ${output2[1]} ${output2[0]}
- clean_state
}
-test_check_dead() {
- echo "0" >> /var/tmp/flink-gpu-assignment
- IFS=',' read -r -a output1 <<< $(bash -c "$(dirname
"$0")/testing-gpu-discovery.sh 2 --privilege --check-dead")
- IFS=',' read -r -a output2 <<< $(bash -c "$(dirname
"$0")/testing-gpu-discovery.sh 2 --privilege --check-dead")
+test_preempt_from_dead_processes() {
+ test_pid=$(shuf -i 1-32768 -n 1)
+ while [ $(ps -p $owner | grep -c $owner) -ne 0 ]
+ do
+ test_pid=$(shuf -i 1-32768 -n 1)
+ done
Review comment:
`test_pid=$(shuf -i 1-32768 -n 1)` can be deduplicated with the
following approach.
```
local test_pid
while [[ -z $test_pid || $(ps -p $owner | grep -c $owner) -ne 0 ]]
do
test_pid=$(shuf -i 1-32768 -n 1)
done
```
##########
File path:
flink-external-resources/flink-external-resource-gpu/src/main/resources/gpu-discovery-common.sh
##########
@@ -0,0 +1,93 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+non_coordination_allocate() {
+ indexes=($1)
+ amount=$2
+ to_occupy_indexes=(${indexes[@]:0:$amount})
+ if [ $amount -gt ${#to_occupy_indexes[@]} ]; then
+ echo "Could not get enough GPU resources."
+ exit 1
+ fi
+ echo ${to_occupy_indexes[@]} | sed 's/ /,/g'
+}
+
+coordination_allocate() {
+ indexes=$1
+ amount=$2
+ coordination_file=${3:-/var/tmp/flink-gpu-coordination}
+ (
+ flock -x 200
+ # GPU indexes to be occupied. If we finally got enough indexes, index in
this array would be recorded in the coordination file.
+ to_occupy_indexes=()
+ # GPU indexes which already recorded in the coordination file. We should
not occupy those indexes unless there is not
+ # enough indexes.
+ recorded_indexes=()
+ for i in $indexes
+ do
+ if [ ${#to_occupy_indexes[@]} -eq $amount ]; then
+ break
+ elif [ `grep -c "^$i " $coordination_file` -ne 0 ]; then
+ recorded_indexes[${#recorded_indexes[@]}]=$i
+ else
+ to_occupy_indexes[${#to_occupy_indexes[@]}]=$i
+ fi
+ done
+
+ if [ ${#to_occupy_indexes[@]} -ne $amount ]; then
+ for i in ${!recorded_indexes[@]}
+ do
+ if [ ${#to_occupy_indexes[@]} -eq $amount ];then
+ break
+ fi
Review comment:
I think this can be simplified with a while-loop.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]