AHeise commented on a change in pull request #17580:
URL: https://github.com/apache/flink/pull/17580#discussion_r740480590



##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/ExecutionEnvironmentTest.java
##########
@@ -26,13 +26,22 @@
 import org.apache.flink.api.java.io.DiscardingOutputFormat;

Review comment:
       Hm, we should probably remove the fallback and fail hard to exactly find 
these kind of issues.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/ExecutionEnvironmentTest.java
##########
@@ -91,9 +106,42 @@ public void testUserDefinedJobNameWithConfigure() {
         testJobName(jobName, env);
     }
 
+    @Test
+    public void testConfigureFileSystem() throws Exception {
+        final Configuration config = new Configuration();
+        EXPECTED_CONFIGURATION.forEach(config::setString);
+        final StreamExecutionEnvironment environment = new 
LocalStreamEnvironment(config);
+        environment.getCheckpointConfig().setCheckpointStorage("testConfig:/" 
+ tmp.newFolder());
+        environment.enableCheckpointing(100);
+        environment.fromElements(1, 2, 3, 4).map((MapFunction<Integer, 
Integer>) value -> value);
+        // We only want to enable the configuration check for this test case. 
Nevertheless, the
+        // filesystem is loaded and configured for all test cases in this 
module.
+        expectConfiguration = true;

Review comment:
       Fair point. 
   
   On a related note: Does that configuration really need to be volatile? 

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/ExecutionEnvironmentTest.java
##########
@@ -91,9 +106,42 @@ public void testUserDefinedJobNameWithConfigure() {
         testJobName(jobName, env);
     }
 
+    @Test
+    public void testConfigureFileSystem() throws Exception {
+        final Configuration config = new Configuration();
+        EXPECTED_CONFIGURATION.forEach(config::setString);
+        final StreamExecutionEnvironment environment = new 
LocalStreamEnvironment(config);
+        environment.getCheckpointConfig().setCheckpointStorage("testConfig:/" 
+ tmp.newFolder());
+        environment.enableCheckpointing(100);
+        environment.fromElements(1, 2, 3, 4).map((MapFunction<Integer, 
Integer>) value -> value);
+        // We only want to enable the configuration check for this test case. 
Nevertheless, the
+        // filesystem is loaded and configured for all test cases in this 
module.
+        expectConfiguration = true;
+        environment.execute("configureFilesystem");
+        expectConfiguration = false;
+    }
+
     private void testJobName(String prefixOfExpectedJobName, 
ExecutionEnvironment env) {
         env.fromElements(1, 2, 3).writeAsText("/dev/null");
         Plan plan = env.createProgramPlan();
         assertTrue(plan.getJobName().startsWith(prefixOfExpectedJobName));
     }
+
+    /** Test filesystem to check that configurations are propagated. */
+    public static final class TestFileSystemFactoryWithConfiguration
+            extends TestFileSystem.TestFileSystemFactory {
+
+        @Override
+        public String getScheme() {
+            return "testConfig";
+        }
+
+        @Override
+        public void configure(Configuration config) {
+            if (!expectConfiguration) {
+                return;
+            }
+            EXPECTED_CONFIGURATION.forEach((k, v) -> assertEquals(v, 
config.getString(k, null)));

Review comment:
       ```
   assertEquals(v, config.getString(k, null), "Unexpected config entry for " + 
k)
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
##########
@@ -279,6 +282,10 @@ public void start() throws Exception {
                     miniClusterConfiguration.getRpcServiceSharing() == 
RpcServiceSharing.SHARED;
 
             try {
+                final PluginManager pluginManager =
+                        
PluginUtils.createPluginManagerFromRootFolder(configuration);
+                FileSystem.initialize(configuration, pluginManager);

Review comment:
       Would it be safer to move it to the `LocalExecutor`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to