wangyang0918 commented on a change in pull request #12455:
URL: https://github.com/apache/flink/pull/12455#discussion_r434416203



##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java
##########
@@ -72,7 +81,7 @@ public void testDynamicProperties() throws Exception {
                Assert.assertNotNull(clientFactory);
 
                final Map<String, String> executorConfigMap = 
executorConfig.toMap();
-               assertEquals(3, executorConfigMap.size());
+               assertEquals(4, executorConfigMap.size());
                assertEquals("5 min", 
executorConfigMap.get("akka.ask.timeout"));
                assertEquals("-DappName=foobar", 
executorConfigMap.get("env.java.opts"));
                
assertTrue(executorConfigMap.containsKey(DeploymentOptions.TARGET.key()));

Review comment:
       Since we change the test here, it is better to add a new `assert` for 
`CONF_DIR`.
   
   ```
   assertEquals(tmp.getRoot().getAbsolutePath(), 
executorConfig.get(DeploymentOptionsInternal.CONF_DIR));
   ```

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
##########
@@ -68,14 +69,14 @@
        private final ExecutorCLI cli;
        private final ClusterClientServiceLoader clusterClientServiceLoader;
 
-       public KubernetesSessionCli(Configuration configuration) {
-               this(configuration, new DefaultClusterClientServiceLoader());
+       public KubernetesSessionCli(Configuration configuration, String 
configDir) {
+               this(configuration, new DefaultClusterClientServiceLoader(), 
configDir);
        }
 
-       public KubernetesSessionCli(Configuration configuration, 
ClusterClientServiceLoader clusterClientServiceLoader) {
+       public KubernetesSessionCli(Configuration configuration, 
ClusterClientServiceLoader clusterClientServiceLoader, String configDir) {
                this.baseConfiguration = new 
UnmodifiableConfiguration(checkNotNull(configuration));
                this.clusterClientServiceLoader = 
checkNotNull(clusterClientServiceLoader);
-               this.cli = new ExecutorCLI(baseConfiguration);
+               this.cli = new ExecutorCLI(baseConfiguration, configDir);

Review comment:
       Since we have set the `configDir` to configuration, then some 
`CliFrontend.getConfigurationDirectoryFromEnv` in kubernetes implementation 
could be replaced with `flinkConfig.get(DeploymentOptionsInternal.CONF_DIR)`. 
For example, `FlinkConfMountDecorator#getLocalLogConfFiles`.
   
   We could add a new method `String getFlinkConfDir()` in 
`KubernetesParameters` and implement it in `AbstractKubernetesParameters`.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
##########
@@ -51,6 +53,11 @@ public boolean isCompatibleWith(Configuration configuration) 
{
        @Override
        public YarnClusterDescriptor createClusterDescriptor(Configuration 
configuration) {
                checkNotNull(configuration);
+

Review comment:
       The `YarnLogConfigUtil.setLogConfigFileInConfig` in 
`YarnClusterDescriptor#deployApplicationCluster` could be removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to