This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new f6f6271  [FLINK-20143][yarn] Support non-qualified path for Yarn 
shared lib
f6f6271 is described below

commit f6f6271e8c17f7873ccb4cfe649d0ad35dfde445
Author: wangyang0918 <[email protected]>
AuthorDate: Mon Nov 16 07:52:28 2020 +0100

    [FLINK-20143][yarn] Support non-qualified path for Yarn shared lib
    
    This closes #14082.
---
 .../src/main/java/org/apache/flink/yarn/Utils.java | 34 ++++++++++++++
 .../apache/flink/yarn/YarnClusterDescriptor.java   | 16 +------
 .../test/java/org/apache/flink/yarn/UtilsTest.java | 54 ++++++++++++++++++++++
 3 files changed, 89 insertions(+), 15 deletions(-)

diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index a28f55b..e1c7665 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -19,11 +19,14 @@
 package org.apache.flink.yarn;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
 import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.function.FunctionWithException;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.conf.Configuration;
@@ -593,4 +596,35 @@ public final class Utils {
 
                return Resource.newInstance(unitMemMB, unitVcore);
        }
+
+       public static List<Path> getQualifiedRemoteSharedPaths(
+                       org.apache.flink.configuration.Configuration 
configuration,
+                       YarnConfiguration yarnConfiguration) throws 
IOException, FlinkException {
+
+               return getRemoteSharedPaths(
+                               configuration,
+                               pathStr -> {
+                                       final Path path = new Path(pathStr);
+                                       return 
path.getFileSystem(yarnConfiguration).makeQualified(path);
+                               });
+       }
+
+       private static List<Path> getRemoteSharedPaths(
+                       org.apache.flink.configuration.Configuration 
configuration,
+                       FunctionWithException<String, Path, IOException> 
strToPathMapper) throws IOException, FlinkException {
+
+               final List<Path> providedLibDirs = 
ConfigUtils.decodeListFromConfig(
+                       configuration,
+                       YarnConfigOptions.PROVIDED_LIB_DIRS,
+                       strToPathMapper);
+
+               for (Path path : providedLibDirs) {
+                       if (!Utils.isRemotePath(path.toString())) {
+                               throw new FlinkException(
+                                       "The \"" + 
YarnConfigOptions.PROVIDED_LIB_DIRS.key() + "\" should only contain" +
+                                               " dirs accessible from all 
worker nodes, while the \"" + path + "\" is local.");
+                       }
+               }
+               return providedLibDirs;
+       }
 }
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index c98139c..32cc7ba 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -670,7 +670,7 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
 
                ApplicationSubmissionContext appContext = 
yarnApplication.getApplicationSubmissionContext();
 
-               final List<Path> providedLibDirs = 
getRemoteSharedPaths(configuration);
+               final List<Path> providedLibDirs = 
Utils.getQualifiedRemoteSharedPaths(configuration, yarnConfiguration);
 
                final YarnApplicationFileUploader fileUploader = 
YarnApplicationFileUploader.from(
                        fs,
@@ -1052,20 +1052,6 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
                return fileReplication > 0 ? fileReplication : 
yarnFileReplication;
        }
 
-       private List<Path> getRemoteSharedPaths(Configuration configuration) 
throws IOException, FlinkException {
-               final List<Path> providedLibDirs = 
ConfigUtils.decodeListFromConfig(
-                       configuration, YarnConfigOptions.PROVIDED_LIB_DIRS, 
Path::new);
-
-               for (Path path : providedLibDirs) {
-                       if (!Utils.isRemotePath(path.toString())) {
-                               throw new FlinkException(
-                                               "The \"" + 
YarnConfigOptions.PROVIDED_LIB_DIRS.key() + "\" should only contain" +
-                                                               " dirs 
accessible from all worker nodes, while the \"" + path + "\" is local.");
-                       }
-               }
-               return providedLibDirs;
-       }
-
        private static String 
encodeYarnLocalResourceDescriptorListToString(List<YarnLocalResourceDescriptor> 
resources) {
                return String.join(
                        LOCAL_RESOURCE_DESCRIPTOR_SEPARATOR,
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
index 7c09430..057a449 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -18,22 +18,29 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Collections;
+import java.util.List;
 import java.util.stream.Stream;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for {@link Utils}.
@@ -88,6 +95,53 @@ public class UtilsTest extends TestLogger {
                verifyUnitResourceVariousSchedulers(yarnConfig, minMem, 
minVcore, incMem, incVcore);
        }
 
+       @Test
+       public void testSharedLibWithNonQualifiedPath() throws Exception {
+               final String sharedLibPath = "/flink/sharedLib";
+               final String nonQualifiedPath = "hdfs://" + sharedLibPath;
+               final String defaultFs = "hdfs://localhost:9000";
+               final String qualifiedPath = defaultFs + sharedLibPath;
+
+               final Configuration flinkConfig = new Configuration();
+               flinkConfig.set(YarnConfigOptions.PROVIDED_LIB_DIRS, 
Collections.singletonList(nonQualifiedPath));
+               final YarnConfiguration yarnConfig = new YarnConfiguration();
+               
yarnConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultFs);
+
+               final List<org.apache.hadoop.fs.Path> sharedLibs = 
Utils.getQualifiedRemoteSharedPaths(flinkConfig, yarnConfig);
+               assertThat(sharedLibs.size(), is(1));
+               assertThat(sharedLibs.get(0).toUri().toString(), 
is(qualifiedPath));
+       }
+
+       @Test
+       public void testSharedLibIsNotRemotePathShouldThrowException() throws 
IOException {
+               final String localLib = "file:///flink/sharedLib";
+               final Configuration flinkConfig = new Configuration();
+               flinkConfig.set(YarnConfigOptions.PROVIDED_LIB_DIRS, 
Collections.singletonList(localLib));
+
+               try {
+                       Utils.getQualifiedRemoteSharedPaths(flinkConfig, new 
YarnConfiguration());
+                       fail("We should throw an exception when the shared lib 
is set to local path.");
+               } catch (FlinkException ex) {
+                       final String msg = "The \"" + 
YarnConfigOptions.PROVIDED_LIB_DIRS.key() + "\" should only " +
+                               "contain dirs accessible from all worker nodes";
+
+                       boolean found = false;
+                       Throwable t = ex;
+                       while (t != null) {
+                               if (t.getMessage() != null && 
t.getMessage().contains(msg)) {
+                                       found = true;
+                                       break;
+                               } else {
+                                       t = t.getCause();
+                               }
+                       }
+
+                       if (!found) {
+                               fail("Could not find expected error message: " 
+ msg);
+                       }
+               }
+       }
+
        private static void 
verifyUnitResourceVariousSchedulers(YarnConfiguration yarnConfig, int minMem, 
int minVcore, int incMem, int incVcore) {
                yarnConfig.set(YarnConfiguration.RM_SCHEDULER, 
Utils.YARN_RM_FAIR_SCHEDULER_CLAZZ);
                verifyUnitResource(yarnConfig, incMem, incVcore);

Reply via email to