This is an automated email from the ASF dual-hosted git repository.
wangyang0918 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new fc533b9d9c1 [FLINK-26047][yarn] Support remote usrlib in HDFS for YARN
deployment
fc533b9d9c1 is described below
commit fc533b9d9c1252db124b4c1cb8365b3906b009cf
Author: Biao Geng <[email protected]>
AuthorDate: Fri Mar 11 21:22:52 2022 +0800
[FLINK-26047][yarn] Support remote usrlib in HDFS for YARN deployment
---
.../generated/yarn_config_configuration.html | 6 +++
.../src/main/java/org/apache/flink/yarn/Utils.java | 47 ++++++++++++++---
.../flink/yarn/YarnApplicationFileUploader.java | 6 +--
.../apache/flink/yarn/YarnClusterDescriptor.java | 18 +++++--
.../yarn/configuration/YarnConfigOptions.java | 15 ++++++
.../test/java/org/apache/flink/yarn/UtilsTest.java | 60 +++++++++++++++++++---
6 files changed, 130 insertions(+), 22 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/yarn_config_configuration.html
b/docs/layouts/shortcodes/generated/yarn_config_configuration.html
index f59cd63844b..af0156f7bf3 100644
--- a/docs/layouts/shortcodes/generated/yarn_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/yarn_config_configuration.html
@@ -134,6 +134,12 @@
<td>List<String></td>
<td>A semicolon-separated list of provided lib directories. They
should be pre-uploaded and world-readable. Flink will use them to exclude the
local Flink jars(e.g. flink-dist, lib/, plugins/)uploading to accelerate the
job submission process. Also YARN will cache them on the nodes so that they
doesn't need to be downloaded every time for each application. An example could
be hdfs://$namenode_address/path/of/flink/lib</td>
</tr>
+ <tr>
+ <td><h5>yarn.provided.usrlib.dir</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The provided usrlib directory in remote. It should be
pre-uploaded and world-readable. Flink will use it to exclude the local usrlib
directory(i.e. usrlib/ under the parent directory of FLINK_LIB_DIR). Unlike
yarn.provided.lib.dirs, YARN will not cache it on the nodes as it is for each
application. An example could be
hdfs://$namenode_address/path/of/flink/usrlib</td>
+ </tr>
<tr>
<td><h5>yarn.security.kerberos.localized-keytab-path</h5></td>
<td style="word-wrap: break-word;">"krb5.keytab"</td>
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 7550715d13e..5d0da8efb9a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -19,12 +19,12 @@
package org.apache.flink.yarn;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.security.token.DelegationTokenConverter;
import org.apache.flink.runtime.util.HadoopUtils;
-import org.apache.flink.util.FlinkException;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
@@ -67,7 +67,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
import static
org.apache.flink.yarn.YarnConfigKeys.LOCAL_RESOURCE_DESCRIPTOR_SEPARATOR;
@@ -633,12 +635,12 @@ public final class Utils {
return Resource.newInstance(unitMemMB, unitVcore);
}
- public static List<Path> getQualifiedRemoteSharedPaths(
+ public static List<Path> getQualifiedRemoteProvidedLibDirs(
org.apache.flink.configuration.Configuration configuration,
YarnConfiguration yarnConfiguration)
- throws IOException, FlinkException {
+ throws IOException {
- return getRemoteSharedPaths(
+ return getRemoteSharedLibPaths(
configuration,
pathStr -> {
final Path path = new Path(pathStr);
@@ -646,10 +648,10 @@ public final class Utils {
});
}
- private static List<Path> getRemoteSharedPaths(
+ private static List<Path> getRemoteSharedLibPaths(
org.apache.flink.configuration.Configuration configuration,
FunctionWithException<String, Path, IOException> strToPathMapper)
- throws IOException, FlinkException {
+ throws IOException {
final List<Path> providedLibDirs =
ConfigUtils.decodeListFromConfig(
@@ -657,7 +659,7 @@ public final class Utils {
for (Path path : providedLibDirs) {
if (!Utils.isRemotePath(path.toString())) {
- throw new FlinkException(
+ throw new IllegalArgumentException(
"The \""
+ YarnConfigOptions.PROVIDED_LIB_DIRS.key()
+ "\" should only contain"
@@ -669,6 +671,37 @@ public final class Utils {
return providedLibDirs;
}
+ public static boolean isUsrLibDirectory(final FileSystem fileSystem, final
Path path)
+ throws IOException {
+ final FileStatus fileStatus = fileSystem.getFileStatus(path);
+ // Use the Path obj from fileStatus to get rid of trailing slash
+ return fileStatus.isDirectory()
+ &&
ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR.equals(fileStatus.getPath().getName());
+ }
+
+ public static Optional<Path> getQualifiedRemoteProvidedUsrLib(
+ org.apache.flink.configuration.Configuration configuration,
+ YarnConfiguration yarnConfiguration)
+ throws IOException, IllegalArgumentException {
+ String usrlib =
configuration.getString(YarnConfigOptions.PROVIDED_USRLIB_DIR);
+ if (usrlib == null) {
+ return Optional.empty();
+ }
+ final Path qualifiedUsrLibPath =
+ FileSystem.get(yarnConfiguration).makeQualified(new
Path(usrlib));
+ checkArgument(
+ isRemotePath(qualifiedUsrLibPath.toString()),
+ "The \"%s\" must point to a remote dir "
+ + "which is accessible from all worker nodes.",
+ YarnConfigOptions.PROVIDED_USRLIB_DIR.key());
+ checkArgument(
+ isUsrLibDirectory(FileSystem.get(yarnConfiguration),
qualifiedUsrLibPath),
+ "The \"%s\" should be named with \"%s\".",
+ YarnConfigOptions.PROVIDED_USRLIB_DIR.key(),
+ ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR);
+ return Optional.of(qualifiedUsrLibPath);
+ }
+
public static YarnConfiguration getYarnAndHadoopConfiguration(
org.apache.flink.configuration.Configuration flinkConfig) {
final YarnConfiguration yarnConfig = getYarnConfiguration(flinkConfig);
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java
index 0e385b1ba51..c03d15e3552 100644
---
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java
+++
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java
@@ -514,11 +514,7 @@ class YarnApplicationFileUploader implements AutoCloseable
{
private boolean isUsrLibDirIncludedInProvidedLib(final List<Path>
providedLibDirs)
throws IOException {
for (Path path : providedLibDirs) {
- final FileStatus fileStatus = fileSystem.getFileStatus(path);
- // Use the Path obj from fileStatus to get rid of trailing slash
- if (fileStatus.isDirectory()
- && ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR.equals(
- fileStatus.getPath().getName())) {
+ if (Utils.isUsrLibDirectory(fileSystem, path)) {
return true;
}
}
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 04b35098e8b..7f599490aa9 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -811,7 +811,10 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
ApplicationSubmissionContext appContext =
yarnApplication.getApplicationSubmissionContext();
final List<Path> providedLibDirs =
- Utils.getQualifiedRemoteSharedPaths(configuration,
yarnConfiguration);
+ Utils.getQualifiedRemoteProvidedLibDirs(configuration,
yarnConfiguration);
+
+ final Optional<Path> providedUsrLibDir =
+ Utils.getQualifiedRemoteProvidedUsrLib(configuration,
yarnConfiguration);
Path stagingDirPath = getStagingDir(fs);
FileSystem stagingDirFs =
stagingDirPath.getFileSystem(yarnConfiguration);
@@ -945,8 +948,17 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
: Path.CUR_DIR,
LocalResourceType.FILE);
- // usrlib will be automatically shipped if it exists.
- if (ClusterEntrypointUtils.tryFindUserLibDirectory().isPresent()) {
+ // usrlib in remote will be used first.
+ if (providedUsrLibDir.isPresent()) {
+ final List<String> usrLibClassPaths =
+ fileUploader.registerMultipleLocalResources(
+ Collections.singletonList(providedUsrLibDir.get()),
+ Path.CUR_DIR,
+ LocalResourceType.FILE);
+ userClassPaths.addAll(usrLibClassPaths);
+ } else if
(ClusterEntrypointUtils.tryFindUserLibDirectory().isPresent()) {
+ // local usrlib will be automatically shipped if it exists and
there is no remote
+ // usrlib.
final Set<File> usrLibShipFiles = new HashSet<>();
addUsrLibFolderToShipFiles(usrLibShipFiles);
final List<String> usrLibClassPaths =
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 78a04149ef8..c7ec43bfd48 100644
---
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -340,6 +340,21 @@ public class YarnConfigOptions {
+ "they doesn't need to be downloaded
every time for each application. An example could be "
+
"hdfs://$namenode_address/path/of/flink/lib");
+ /**
+ * Allows users to directly utilize usrlib directory in HDFS for YARN
application mode. The
+ * classloader for loading jars under the usrlib will be controlled by
{@link
+ * YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR}.
+ */
+ public static final ConfigOption<String> PROVIDED_USRLIB_DIR =
+ key("yarn.provided.usrlib.dir")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The provided usrlib directory in remote. It
should be pre-uploaded and "
+ + "world-readable. Flink will use it to
exclude the local usrlib directory(i.e. usrlib/ under the parent directory of
FLINK_LIB_DIR)."
+ + " Unlike yarn.provided.lib.dirs, YARN
will not cache it on the nodes as it is for each application. An example could
be "
+ +
"hdfs://$namenode_address/path/of/flink/usrlib");
+
@SuppressWarnings("unused")
public static final ConfigOption<String> HADOOP_CONFIG_KEY =
key("flink.hadoop.<key>")
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
index e7956aca991..fd8aadd7b98 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -18,16 +18,19 @@
package org.apache.flink.yarn;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.FlinkException;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import java.io.File;
+import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
@@ -98,7 +101,7 @@ class UtilsTest {
yarnConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
defaultFs);
final List<org.apache.hadoop.fs.Path> sharedLibs =
- Utils.getQualifiedRemoteSharedPaths(flinkConfig, yarnConfig);
+ Utils.getQualifiedRemoteProvidedLibDirs(flinkConfig,
yarnConfig);
assertThat(sharedLibs).hasSize(1);
assertThat(sharedLibs.get(0).toUri()).hasToString(qualifiedPath);
}
@@ -115,14 +118,57 @@ class UtilsTest {
+ "\" should only "
+ "contain dirs accessible from all worker nodes";
assertThatThrownBy(
- () -> {
- Utils.getQualifiedRemoteSharedPaths(
- flinkConfig, new YarnConfiguration());
- })
- .isInstanceOf(FlinkException.class)
+ () ->
+ Utils.getQualifiedRemoteProvidedLibDirs(
+ flinkConfig, new YarnConfiguration()))
+ .isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining(msg);
}
+ @Test
+ void testInvalidRemoteUsrLib(@TempDir Path tempDir) throws IOException {
+ final String sharedLibPath = "hdfs:///flink/badlib";
+
+ final org.apache.hadoop.conf.Configuration hdConf =
+ new org.apache.hadoop.conf.Configuration();
+ hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
tempDir.toAbsolutePath().toString());
+ try (final MiniDFSCluster hdfsCluster = new
MiniDFSCluster.Builder(hdConf).build()) {
+ final org.apache.hadoop.fs.Path hdfsRootPath =
+ new org.apache.hadoop.fs.Path(hdfsCluster.getURI());
+ hdfsCluster.getFileSystem().mkdirs(new
org.apache.hadoop.fs.Path(sharedLibPath));
+
+ final Configuration flinkConfig = new Configuration();
+ flinkConfig.set(YarnConfigOptions.PROVIDED_USRLIB_DIR,
sharedLibPath);
+ final YarnConfiguration yarnConfig = new YarnConfiguration();
+ yarnConfig.set(
+ CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
hdfsRootPath.toString());
+ assertThatThrownBy(
+ () ->
Utils.getQualifiedRemoteProvidedUsrLib(flinkConfig, yarnConfig))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "The \"%s\" should be named with \"%s\".",
+ YarnConfigOptions.PROVIDED_USRLIB_DIR.key(),
+ ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR);
+ }
+ }
+
+ @Test
+ void testSharedUsrLibIsNotRemotePathShouldThrowException(@TempDir Path
tempDir) {
+ final File localLib = new File(tempDir.toAbsolutePath().toString(),
"usrlib");
+ assertThat(localLib.mkdirs()).isTrue();
+ final Configuration flinkConfig = new Configuration();
+ flinkConfig.set(YarnConfigOptions.PROVIDED_USRLIB_DIR,
localLib.getAbsolutePath());
+ assertThatThrownBy(
+ () ->
+ Utils.getQualifiedRemoteProvidedUsrLib(
+ flinkConfig, new YarnConfiguration()))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "The \"%s\" must point to a remote dir "
+ + "which is accessible from all worker nodes.",
+ YarnConfigOptions.PROVIDED_USRLIB_DIR.key());
+ }
+
@Test
void testGetYarnConfiguration() {
final String flinkPrefix = "flink.yarn.";