wangyang0918 commented on code in PR #19065:
URL: https://github.com/apache/flink/pull/19065#discussion_r918800030
##########
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNApplicationITCase.java:
##########
@@ -88,6 +88,22 @@ void testApplicationClusterWithRemoteUserJar() throws
Exception {
remoteJar,
YarnConfigOptions.UserJarInclusion.DISABLED)));
}
+ @Test
+ public void testApplicationClusterWithRemoteUsrLib() throws Exception {
Review Comment:
I am not sure what do you want to verify in this test. It seems that this
test does not actually verify the usrlib is actually loaded.
##########
flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java:
##########
@@ -633,17 +635,80 @@ static Resource getUnitResource(YarnConfiguration
yarnConfig) {
return Resource.newInstance(unitMemMB, unitVcore);
}
- public static List<Path> getQualifiedRemoteSharedPaths(
+ private static Path strToPathMapper(String pathStr, YarnConfiguration
yarnConfiguration)
+ throws IOException {
+ final Path path = new Path(pathStr);
+ return path.getFileSystem(yarnConfiguration).makeQualified(path);
+ }
+
+ public static List<Path> getQualifiedRemoteProvidedLibDirs(
org.apache.flink.configuration.Configuration configuration,
YarnConfiguration yarnConfiguration)
- throws IOException, FlinkException {
+ throws IOException {
+ final List<String> providedDirs =
+ ConfigUtils.decodeListFromConfig(
+ configuration, YarnConfigOptions.PROVIDED_LIB_DIRS,
String::toString);
+ return getQualifiedRemoteSharedPaths(
+ providedDirs, YarnConfigOptions.PROVIDED_LIB_DIRS.key(),
yarnConfiguration);
+ }
+
+ public static boolean isUsrLibIncludedInPath(final FileSystem fileSystem,
final Path path)
+ throws IOException {
+ final FileStatus fileStatus = fileSystem.getFileStatus(path);
+ // Use the Path obj from fileStatus to get rid of trailing slash
+ return fileStatus.isDirectory()
+ &&
ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR.equals(fileStatus.getPath().getName());
+ }
+
+ public static Path getQualifiedRemoteProvidedUsrLib(
+ org.apache.flink.configuration.Configuration configuration,
+ YarnConfiguration yarnConfiguration)
+ throws IOException, IllegalArgumentException {
+ String usrlib =
configuration.getString(YarnConfigOptions.PROVIDED_USRLIB_DIR);
+ if (usrlib == null) {
+ return null;
+ }
+ final List<String> providedDirs = Collections.singletonList(usrlib);
+ final List<Path> paths =
+ getQualifiedRemoteSharedPaths(
+ providedDirs,
+ YarnConfigOptions.PROVIDED_USRLIB_DIR.key(),
+ yarnConfiguration);
+ if (paths == null || paths.size() == 0) {
+ return null;
+ }
+ final Path providedUsrLibPath = paths.get(0);
+ checkArgument(
+ isUsrLibIncludedInPath(
+
providedUsrLibPath.getFileSystem(yarnConfiguration),
+ providedUsrLibPath)
+ && isRemotePath(providedUsrLibPath.toString()),
+ "The %s must point to a remote dir named with %s.",
Review Comment:
We already verify whether it is a remote path in the
`validateRemoteSharedPaths`. Maybe we do not need to call the
`getQualifiedRemoteSharedPaths`.
```
final Path providedUsrLibPath = strToPathMapper(usrlib,
yarnConfiguration);
checkArgument(
isUsrLibIncludedInPath(
providedUsrLibPath.getFileSystem(yarnConfiguration),
providedUsrLibPath)
&& isRemotePath(providedUsrLibPath.toString()),
"The %s must point to a remote dir named with %s.",
YarnConfigOptions.PROVIDED_USRLIB_DIR.key(),
ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR);
```
##########
flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java:
##########
@@ -633,17 +635,80 @@ static Resource getUnitResource(YarnConfiguration
yarnConfig) {
return Resource.newInstance(unitMemMB, unitVcore);
}
- public static List<Path> getQualifiedRemoteSharedPaths(
+ private static Path strToPathMapper(String pathStr, YarnConfiguration
yarnConfiguration)
+ throws IOException {
+ final Path path = new Path(pathStr);
+ return path.getFileSystem(yarnConfiguration).makeQualified(path);
+ }
+
+ public static List<Path> getQualifiedRemoteProvidedLibDirs(
org.apache.flink.configuration.Configuration configuration,
YarnConfiguration yarnConfiguration)
- throws IOException, FlinkException {
+ throws IOException {
+ final List<String> providedDirs =
+ ConfigUtils.decodeListFromConfig(
+ configuration, YarnConfigOptions.PROVIDED_LIB_DIRS,
String::toString);
+ return getQualifiedRemoteSharedPaths(
+ providedDirs, YarnConfigOptions.PROVIDED_LIB_DIRS.key(),
yarnConfiguration);
+ }
+
+ public static boolean isUsrLibIncludedInPath(final FileSystem fileSystem,
final Path path)
+ throws IOException {
+ final FileStatus fileStatus = fileSystem.getFileStatus(path);
+ // Use the Path obj from fileStatus to get rid of trailing slash
+ return fileStatus.isDirectory()
+ &&
ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR.equals(fileStatus.getPath().getName());
+ }
+
+ public static Path getQualifiedRemoteProvidedUsrLib(
+ org.apache.flink.configuration.Configuration configuration,
+ YarnConfiguration yarnConfiguration)
+ throws IOException, IllegalArgumentException {
+ String usrlib =
configuration.getString(YarnConfigOptions.PROVIDED_USRLIB_DIR);
+ if (usrlib == null) {
+ return null;
+ }
+ final List<String> providedDirs = Collections.singletonList(usrlib);
+ final List<Path> paths =
+ getQualifiedRemoteSharedPaths(
+ providedDirs,
+ YarnConfigOptions.PROVIDED_USRLIB_DIR.key(),
+ yarnConfiguration);
+ if (paths == null || paths.size() == 0) {
+ return null;
+ }
Review Comment:
This is unnecessary since the `paths` will never be null or empty.
##########
flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java:
##########
@@ -633,17 +635,80 @@ static Resource getUnitResource(YarnConfiguration
yarnConfig) {
return Resource.newInstance(unitMemMB, unitVcore);
}
- public static List<Path> getQualifiedRemoteSharedPaths(
+ private static Path strToPathMapper(String pathStr, YarnConfiguration
yarnConfiguration)
+ throws IOException {
+ final Path path = new Path(pathStr);
+ return path.getFileSystem(yarnConfiguration).makeQualified(path);
+ }
+
+ public static List<Path> getQualifiedRemoteProvidedLibDirs(
org.apache.flink.configuration.Configuration configuration,
YarnConfiguration yarnConfiguration)
- throws IOException, FlinkException {
+ throws IOException {
+ final List<String> providedDirs =
+ ConfigUtils.decodeListFromConfig(
+ configuration, YarnConfigOptions.PROVIDED_LIB_DIRS,
String::toString);
+ return getQualifiedRemoteSharedPaths(
+ providedDirs, YarnConfigOptions.PROVIDED_LIB_DIRS.key(),
yarnConfiguration);
+ }
+
+ public static boolean isUsrLibIncludedInPath(final FileSystem fileSystem,
final Path path)
+ throws IOException {
+ final FileStatus fileStatus = fileSystem.getFileStatus(path);
+ // Use the Path obj from fileStatus to get rid of trailing slash
+ return fileStatus.isDirectory()
+ &&
ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR.equals(fileStatus.getPath().getName());
+ }
+
+ public static Path getQualifiedRemoteProvidedUsrLib(
+ org.apache.flink.configuration.Configuration configuration,
+ YarnConfiguration yarnConfiguration)
+ throws IOException, IllegalArgumentException {
+ String usrlib =
configuration.getString(YarnConfigOptions.PROVIDED_USRLIB_DIR);
+ if (usrlib == null) {
+ return null;
+ }
+ final List<String> providedDirs = Collections.singletonList(usrlib);
+ final List<Path> paths =
+ getQualifiedRemoteSharedPaths(
+ providedDirs,
+ YarnConfigOptions.PROVIDED_USRLIB_DIR.key(),
+ yarnConfiguration);
+ if (paths == null || paths.size() == 0) {
+ return null;
+ }
+ final Path providedUsrLibPath = paths.get(0);
+ checkArgument(
+ isUsrLibIncludedInPath(
+
providedUsrLibPath.getFileSystem(yarnConfiguration),
+ providedUsrLibPath)
+ && isRemotePath(providedUsrLibPath.toString()),
+ "The %s must point to a remote dir named with %s.",
+ YarnConfigOptions.PROVIDED_USRLIB_DIR.key(),
+ ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR);
+ return providedUsrLibPath;
+ }
- return getRemoteSharedPaths(
- configuration,
- pathStr -> {
- final Path path = new Path(pathStr);
- return
path.getFileSystem(yarnConfiguration).makeQualified(path);
- });
+ @VisibleForTesting
+ static List<Path> getQualifiedRemoteSharedPaths(
+ List<String> rawPaths, String configOptionKey, YarnConfiguration
yarnConfiguration)
+ throws IOException {
+ final List<Path> providedDirs = new ArrayList<>();
+ for (String rawPath : rawPaths) {
+ providedDirs.add(strToPathMapper(rawPath, yarnConfiguration));
+ }
+ return validateRemoteSharedPaths(providedDirs, configOptionKey);
+ }
+
+ private static List<Path> validateRemoteSharedPaths(
+ List<Path> providedDirs, String configOptionKey) throws
IOException {
+ for (Path path : providedDirs) {
+ checkArgument(
+ Utils.isRemotePath(path.toString()),
+ "The %s should only contain dirs accessible from all
worker nodes.",
+ configOptionKey);
+ }
+ return providedDirs;
}
private static List<Path> getRemoteSharedPaths(
Review Comment:
`getRemoteSharedPaths` is useless anymore.
##########
flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java:
##########
@@ -633,17 +635,80 @@ static Resource getUnitResource(YarnConfiguration
yarnConfig) {
return Resource.newInstance(unitMemMB, unitVcore);
}
- public static List<Path> getQualifiedRemoteSharedPaths(
+ private static Path strToPathMapper(String pathStr, YarnConfiguration
yarnConfiguration)
+ throws IOException {
+ final Path path = new Path(pathStr);
+ return path.getFileSystem(yarnConfiguration).makeQualified(path);
+ }
+
+ public static List<Path> getQualifiedRemoteProvidedLibDirs(
org.apache.flink.configuration.Configuration configuration,
YarnConfiguration yarnConfiguration)
- throws IOException, FlinkException {
+ throws IOException {
+ final List<String> providedDirs =
+ ConfigUtils.decodeListFromConfig(
+ configuration, YarnConfigOptions.PROVIDED_LIB_DIRS,
String::toString);
+ return getQualifiedRemoteSharedPaths(
+ providedDirs, YarnConfigOptions.PROVIDED_LIB_DIRS.key(),
yarnConfiguration);
+ }
+
+ public static boolean isUsrLibIncludedInPath(final FileSystem fileSystem,
final Path path)
+ throws IOException {
+ final FileStatus fileStatus = fileSystem.getFileStatus(path);
+ // Use the Path obj from fileStatus to get rid of trailing slash
+ return fileStatus.isDirectory()
+ &&
ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR.equals(fileStatus.getPath().getName());
+ }
+
+ public static Path getQualifiedRemoteProvidedUsrLib(
+ org.apache.flink.configuration.Configuration configuration,
+ YarnConfiguration yarnConfiguration)
+ throws IOException, IllegalArgumentException {
+ String usrlib =
configuration.getString(YarnConfigOptions.PROVIDED_USRLIB_DIR);
+ if (usrlib == null) {
+ return null;
+ }
+ final List<String> providedDirs = Collections.singletonList(usrlib);
+ final List<Path> paths =
+ getQualifiedRemoteSharedPaths(
+ providedDirs,
+ YarnConfigOptions.PROVIDED_USRLIB_DIR.key(),
+ yarnConfiguration);
+ if (paths == null || paths.size() == 0) {
+ return null;
+ }
+ final Path providedUsrLibPath = paths.get(0);
+ checkArgument(
+ isUsrLibIncludedInPath(
+
providedUsrLibPath.getFileSystem(yarnConfiguration),
+ providedUsrLibPath)
+ && isRemotePath(providedUsrLibPath.toString()),
+ "The %s must point to a remote dir named with %s.",
+ YarnConfigOptions.PROVIDED_USRLIB_DIR.key(),
+ ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR);
+ return providedUsrLibPath;
+ }
- return getRemoteSharedPaths(
- configuration,
- pathStr -> {
- final Path path = new Path(pathStr);
- return
path.getFileSystem(yarnConfiguration).makeQualified(path);
- });
+ @VisibleForTesting
+ static List<Path> getQualifiedRemoteSharedPaths(
+ List<String> rawPaths, String configOptionKey, YarnConfiguration
yarnConfiguration)
+ throws IOException {
+ final List<Path> providedDirs = new ArrayList<>();
+ for (String rawPath : rawPaths) {
+ providedDirs.add(strToPathMapper(rawPath, yarnConfiguration));
+ }
+ return validateRemoteSharedPaths(providedDirs, configOptionKey);
+ }
+
+ private static List<Path> validateRemoteSharedPaths(
Review Comment:
Why do we need a separate method for validation? This just make us run the
for loop twice.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]