yanyan300300 closed pull request #6828: [FLINK-10516] [yarn] fix 
YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink 
Configuration during setup
URL: https://github.com/apache/flink/pull/6828
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 462682f7e5f..f3dd27b5ef0 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -27,6 +27,7 @@
 import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -67,6 +68,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -160,6 +162,13 @@ protected int run(String[] args) {
 
                        final Configuration flinkConfig = 
createConfiguration(currDir, dynamicProperties);
 
+                       // configure the filesystems
+                       try {
+                               FileSystem.initialize(flinkConfig);
+                       } catch (IOException e) {
+                               throw new IOException("Error while configuring 
the filesystems.", e);
+                       }
+
                        File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
                        if (remoteKeytabPrincipal != null && f.exists()) {
                                String keytabPath = f.getAbsolutePath();
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
index b15374b2a4b..929dbdbce3b 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
@@ -19,9 +19,12 @@
 package org.apache.flink.yarn;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.util.OperatingSystem;
 
+import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.Assume;
@@ -29,9 +32,14 @@
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,6 +56,7 @@
 import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doAnswer;
@@ -56,6 +65,8 @@
 /**
  * Tests for the {@link YarnApplicationMasterRunner}.
  */
+@PrepareForTest(FileSystem.class)
+@RunWith(PowerMockRunner.class)
 public class YarnApplicationMasterRunnerTest {
        private static final Logger LOG = 
LoggerFactory.getLogger(YarnApplicationMasterRunnerTest.class);
 
@@ -109,4 +120,37 @@ public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
                        taskManagerConf, workingDirectory, 
taskManagerMainClass, LOG);
                assertEquals("file", 
ctx.getLocalResources().get("flink.jar").getResource().getScheme());
        }
+
+       @Test
+       public void testRunAndInitializeFileSystem() throws Exception {
+               // Mock necessary system variables
+               Map<String, String> map = new HashMap<String, 
String>(System.getenv());
+               map.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, "foo");
+               // Create dynamic properties to be used in the Flink 
configuration
+               map.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, "myKey=myValue");
+               CommonTestUtils.setEnv(map);
+
+               // Create a temporary flink-conf.yaml and to be deleted on JVM 
exits
+               File currDir = new 
File(System.getenv().get(ApplicationConstants.Environment.PWD.key()));
+               String path = String.format("%s/%s.%s", currDir, "flink-conf", 
"yaml");
+               File f = new File(path);
+               f.createNewFile();
+               f.deleteOnExit();
+
+               // Mock FileSystem.initialize()
+               PowerMockito.mockStatic(FileSystem.class);
+               PowerMockito.doNothing().when(FileSystem.class);
+               FileSystem.initialize(any(Configuration.class));
+
+               String[] args = new String[5];
+               YarnApplicationMasterRunner yarnApplicationMasterRunner = new 
YarnApplicationMasterRunner();
+               yarnApplicationMasterRunner.run(args);
+
+               // Verify FileSystem.initialize() is invoked with the correct 
Flink config
+               ArgumentCaptor<Configuration> propertiesCaptor =
+                       ArgumentCaptor.forClass(Configuration.class);
+               PowerMockito.verifyStatic();
+               FileSystem.initialize(propertiesCaptor.capture());
+               assertEquals("myValue", 
propertiesCaptor.getValue().getString("myKey", ""));
+       }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to