This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new f6f6271 [FLINK-20143][yarn] Support non-qualified path for Yarn
shared lib
f6f6271 is described below
commit f6f6271e8c17f7873ccb4cfe649d0ad35dfde445
Author: wangyang0918 <[email protected]>
AuthorDate: Mon Nov 16 07:52:28 2020 +0100
[FLINK-20143][yarn] Support non-qualified path for Yarn shared lib
This closes #14082.
---
.../src/main/java/org/apache/flink/yarn/Utils.java | 34 ++++++++++++++
.../apache/flink/yarn/YarnClusterDescriptor.java | 16 +------
.../test/java/org/apache/flink/yarn/UtilsTest.java | 54 ++++++++++++++++++++++
3 files changed, 89 insertions(+), 15 deletions(-)
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index a28f55b..e1c7665 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -19,11 +19,14 @@
package org.apache.flink.yarn;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.hadoop.conf.Configuration;
@@ -593,4 +596,35 @@ public final class Utils {
return Resource.newInstance(unitMemMB, unitVcore);
}
+
+ public static List<Path> getQualifiedRemoteSharedPaths(
+ org.apache.flink.configuration.Configuration
configuration,
+ YarnConfiguration yarnConfiguration) throws
IOException, FlinkException {
+
+ return getRemoteSharedPaths(
+ configuration,
+ pathStr -> {
+ final Path path = new Path(pathStr);
+ return
path.getFileSystem(yarnConfiguration).makeQualified(path);
+ });
+ }
+
+ private static List<Path> getRemoteSharedPaths(
+ org.apache.flink.configuration.Configuration
configuration,
+ FunctionWithException<String, Path, IOException>
strToPathMapper) throws IOException, FlinkException {
+
+ final List<Path> providedLibDirs =
ConfigUtils.decodeListFromConfig(
+ configuration,
+ YarnConfigOptions.PROVIDED_LIB_DIRS,
+ strToPathMapper);
+
+ for (Path path : providedLibDirs) {
+ if (!Utils.isRemotePath(path.toString())) {
+ throw new FlinkException(
+ "The \"" +
YarnConfigOptions.PROVIDED_LIB_DIRS.key() + "\" should only contain" +
+ " dirs accessible from all
worker nodes, while the \"" + path + "\" is local.");
+ }
+ }
+ return providedLibDirs;
+ }
}
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index c98139c..32cc7ba 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -670,7 +670,7 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
ApplicationSubmissionContext appContext =
yarnApplication.getApplicationSubmissionContext();
- final List<Path> providedLibDirs =
getRemoteSharedPaths(configuration);
+ final List<Path> providedLibDirs =
Utils.getQualifiedRemoteSharedPaths(configuration, yarnConfiguration);
final YarnApplicationFileUploader fileUploader =
YarnApplicationFileUploader.from(
fs,
@@ -1052,20 +1052,6 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
return fileReplication > 0 ? fileReplication :
yarnFileReplication;
}
- private List<Path> getRemoteSharedPaths(Configuration configuration)
throws IOException, FlinkException {
- final List<Path> providedLibDirs =
ConfigUtils.decodeListFromConfig(
- configuration, YarnConfigOptions.PROVIDED_LIB_DIRS,
Path::new);
-
- for (Path path : providedLibDirs) {
- if (!Utils.isRemotePath(path.toString())) {
- throw new FlinkException(
- "The \"" +
YarnConfigOptions.PROVIDED_LIB_DIRS.key() + "\" should only contain" +
- " dirs
accessible from all worker nodes, while the \"" + path + "\" is local.");
- }
- }
- return providedLibDirs;
- }
-
private static String
encodeYarnLocalResourceDescriptorListToString(List<YarnLocalResourceDescriptor>
resources) {
return String.join(
LOCAL_RESOURCE_DESCRIPTOR_SEPARATOR,
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
index 7c09430..057a449 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -18,22 +18,29 @@
package org.apache.flink.yarn;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
+import java.util.List;
import java.util.stream.Stream;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
/**
* Tests for {@link Utils}.
@@ -88,6 +95,53 @@ public class UtilsTest extends TestLogger {
verifyUnitResourceVariousSchedulers(yarnConfig, minMem,
minVcore, incMem, incVcore);
}
+ @Test
+ public void testSharedLibWithNonQualifiedPath() throws Exception {
+ final String sharedLibPath = "/flink/sharedLib";
+ final String nonQualifiedPath = "hdfs://" + sharedLibPath;
+ final String defaultFs = "hdfs://localhost:9000";
+ final String qualifiedPath = defaultFs + sharedLibPath;
+
+ final Configuration flinkConfig = new Configuration();
+ flinkConfig.set(YarnConfigOptions.PROVIDED_LIB_DIRS,
Collections.singletonList(nonQualifiedPath));
+ final YarnConfiguration yarnConfig = new YarnConfiguration();
+
yarnConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultFs);
+
+ final List<org.apache.hadoop.fs.Path> sharedLibs =
Utils.getQualifiedRemoteSharedPaths(flinkConfig, yarnConfig);
+ assertThat(sharedLibs.size(), is(1));
+ assertThat(sharedLibs.get(0).toUri().toString(),
is(qualifiedPath));
+ }
+
+ @Test
+ public void testSharedLibIsNotRemotePathShouldThrowException() throws
IOException {
+ final String localLib = "file:///flink/sharedLib";
+ final Configuration flinkConfig = new Configuration();
+ flinkConfig.set(YarnConfigOptions.PROVIDED_LIB_DIRS,
Collections.singletonList(localLib));
+
+ try {
+ Utils.getQualifiedRemoteSharedPaths(flinkConfig, new
YarnConfiguration());
+ fail("We should throw an exception when the shared lib
is set to local path.");
+ } catch (FlinkException ex) {
+ final String msg = "The \"" +
YarnConfigOptions.PROVIDED_LIB_DIRS.key() + "\" should only " +
+ "contain dirs accessible from all worker nodes";
+
+ boolean found = false;
+ Throwable t = ex;
+ while (t != null) {
+ if (t.getMessage() != null &&
t.getMessage().contains(msg)) {
+ found = true;
+ break;
+ } else {
+ t = t.getCause();
+ }
+ }
+
+ if (!found) {
+ fail("Could not find expected error message: "
+ msg);
+ }
+ }
+ }
+
private static void
verifyUnitResourceVariousSchedulers(YarnConfiguration yarnConfig, int minMem,
int minVcore, int incMem, int incVcore) {
yarnConfig.set(YarnConfiguration.RM_SCHEDULER,
Utils.YARN_RM_FAIR_SCHEDULER_CLAZZ);
verifyUnitResource(yarnConfig, incMem, incVcore);