Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5225#discussion_r160110442
  
    --- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java ---
    @@ -0,0 +1,293 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.yarn;
    +
    +import org.apache.flink.client.cli.CliFrontendParser;
    +import org.apache.flink.client.deployment.ClusterSpecification;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.HighAvailabilityOptions;
    +import org.apache.flink.configuration.IllegalConfigurationException;
    +import org.apache.flink.util.TestLogger;
    +import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
    +import org.apache.flink.yarn.configuration.YarnConfigOptions;
    +
    +import org.apache.commons.cli.CommandLine;
    +import org.apache.commons.cli.CommandLineParser;
    +import org.apache.commons.cli.DefaultParser;
    +import org.apache.commons.cli.Options;
    +import org.apache.hadoop.yarn.api.records.ApplicationId;
    +import org.junit.Assert;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.nio.file.Files;
    +import java.nio.file.StandardOpenOption;
    +import java.util.Map;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +/**
    + * Tests for the {@link FlinkYarnSessionCli}.
    + */
    +public class FlinkYarnSessionCliTest extends TestLogger {
    +
    +   private static final ApplicationId TEST_YARN_APPLICATION_ID = 
ApplicationId.newInstance(System.currentTimeMillis(), 42);
    +
    +   private static final ApplicationId TEST_YARN_APPLICATION_ID_2 = 
ApplicationId.newInstance(System.currentTimeMillis(), 43);
    +
    +   private static final String TEST_YARN_JOB_MANAGER_ADDRESS = 
"22.33.44.55";
    +   private static final int TEST_YARN_JOB_MANAGER_PORT = 6655;
    +
    +   private static final String validPropertiesFile = "applicationID=" + 
TEST_YARN_APPLICATION_ID;
    +
    +   private static final String invalidPropertiesFile = "jasfobManager=" + 
TEST_YARN_JOB_MANAGER_ADDRESS + ":asf" + TEST_YARN_JOB_MANAGER_PORT;
    +
    +   @Rule
    +   public TemporaryFolder tmp = new TemporaryFolder();
    +
    +   @Test
    +   public void testDynamicProperties() throws Exception {
    +
    +           FlinkYarnSessionCli cli = new FlinkYarnSessionCli(
    +                   "",
    +                   "",
    +                   false);
    +           Options options = new Options();
    +           cli.addGeneralOptions(options);
    +           cli.addRunOptions(options);
    +
    +           CommandLineParser parser = new DefaultParser();
    +           CommandLine cmd = parser.parse(options, new String[]{"run", 
"-j", "fake.jar", "-n", "15",
    +                           "-D", "akka.ask.timeout=5 min", "-D", 
"env.java.opts=-DappName=foobar"});
    +
    +           AbstractYarnClusterDescriptor flinkYarnDescriptor = 
cli.createDescriptor(
    +                   new Configuration(),
    +                   tmp.getRoot().getAbsolutePath(),
    +                   null,
    +                   cmd);
    +
    +           Assert.assertNotNull(flinkYarnDescriptor);
    +
    +           Map<String, String> dynProperties =
    +                   
FlinkYarnSessionCli.getDynamicProperties(flinkYarnDescriptor.getDynamicPropertiesEncoded());
    +           assertEquals(2, dynProperties.size());
    +           assertEquals("5 min", dynProperties.get("akka.ask.timeout"));
    +           assertEquals("-DappName=foobar", 
dynProperties.get("env.java.opts"));
    +   }
    +
    +   @Test
    +   public void testNotEnoughTaskSlots() throws Exception {
    +           String[] params =
    +                   new String[] {"-yn", "2", "-ys", "3", "-p", "7"};
    +
    +           FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli("y", 
"yarn");
    +
    +           Options options = new Options();
    +           // TODO: Nasty workaround: We should get rid of the YarnCLI and 
run options coupling
    +           options.addOption(CliFrontendParser.PARALLELISM_OPTION);
    +           yarnCLI.addGeneralOptions(options);
    +           yarnCLI.addRunOptions(options);
    +
    +           final CommandLine commandLine = 
CliFrontendParser.parse(options, params, true);
    +
    +           ClusterSpecification clusterSpecification = 
yarnCLI.createClusterSpecification(new Configuration(), commandLine);
    +
    +           // each task manager has 3 slots but the parallelism is 7. Thus 
the slots should be increased.
    +           assertEquals(4, clusterSpecification.getSlotsPerTaskManager());
    +           assertEquals(2, clusterSpecification.getNumberTaskManagers());
    +   }
    +
    +   @Test
    +   public void testCorrectSettingOfMaxSlots() throws Exception {
    +           String[] params =
    +                   new String[] {"-yn", "2", "-ys", "3"};
    +
    +           FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli("y", 
"yarn");
    +
    +           final CommandLine commandLine = 
yarnCLI.parseCommandLineOptions(params, true);
    +
    +           final Configuration configuration = new Configuration();
    +
    +           AbstractYarnClusterDescriptor descriptor = 
yarnCLI.createDescriptor(
    +                   configuration,
    +                   tmp.getRoot().getAbsolutePath(),
    +                   "",
    +                   commandLine);
    +
    +           final ClusterSpecification clusterSpecification = 
yarnCLI.createClusterSpecification(
    +                   configuration,
    +                   commandLine);
    +
    +           // each task manager has 3 slots but the parallelism is 7. Thus 
the slots should be increased.
    +           assertEquals(3, clusterSpecification.getSlotsPerTaskManager());
    +           assertEquals(2, clusterSpecification.getNumberTaskManagers());
    +   }
    +
    +   @Test
    +   public void testZookeeperNamespaceProperty() throws Exception {
    +           String zkNamespaceCliInput = "flink_test_namespace";
    +
    +           String[] params = new String[] {"-yn", "2", "-yz", 
zkNamespaceCliInput};
    +
    +           FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli("y", 
"yarn");
    +
    +           CommandLine commandLine = 
yarnCLI.parseCommandLineOptions(params, true);
    +
    +           AbstractYarnClusterDescriptor descriptor = 
yarnCLI.createDescriptor(
    +                   new Configuration(),
    +                   tmp.getRoot().getAbsolutePath(),
    +                   "",
    +                   commandLine);
    +
    +           assertEquals(zkNamespaceCliInput, 
descriptor.getZookeeperNamespace());
    +   }
    +
    +   /**
    +    * Test that the CliFrontend is able to pick up the .yarn-properties 
file from a specified location.
    +    */
    +   @Test
    +   public void testResumeFromYarnPropertiesFile() throws Exception {
    +
    +           File directoryPath = 
writeYarnPropertiesFile(validPropertiesFile);
    +
    +           final FlinkYarnSessionCli flinkYarnSessionCli = new 
FlinkYarnSessionCli("y", "yarn");
    +
    +           final Configuration configuration = new Configuration();
    +           
configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, 
directoryPath.getAbsolutePath());
    +
    +           final CommandLine commandLine = 
flinkYarnSessionCli.parseCommandLineOptions(new String[] {}, true);
    +
    +           final String clusterId = flinkYarnSessionCli.getClusterId(
    +                   configuration,
    +                   commandLine);
    +
    +           assertEquals(TEST_YARN_APPLICATION_ID.toString(), clusterId);
    +   }
    +
    +   @Test(expected = IllegalConfigurationException.class)
    +   public void testInvalidYarnPropertiesFile() throws Exception {
    +
    +           File directoryPath = 
writeYarnPropertiesFile(invalidPropertiesFile);
    +
    +           final FlinkYarnSessionCli flinkYarnSessionCli = new 
FlinkYarnSessionCli("y", "yarn");
    +
    +           final Configuration configuration = new Configuration();
    +           
configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, 
directoryPath.getAbsolutePath());
    +
    +           final CommandLine commandLine = 
flinkYarnSessionCli.parseCommandLineOptions(new String[] {}, true);
    +
    +           flinkYarnSessionCli.getClusterId(
    +                   configuration,
    +                   commandLine);
    +
    +           fail("We should have failed reading the yarn properties file 
when retrieving the cluster id.");
    --- End diff --
    
    It is wrong to put `fail()` if your test expects an exception to be thrown. 
For example: 
    ```
    @Test(expected = IllegalConfigurationException.class)
        public void testInvalidYarnPropertiesFile() throws Exception {
                try {
                        File directoryPath = 
writeYarnPropertiesFile(invalidPropertiesFile);
    
                        final FlinkYarnSessionCli flinkYarnSessionCli = new 
FlinkYarnSessionCli("y", "yarn");
    
                        final Configuration configuration = new Configuration();
                        
configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, 
directoryPath.getAbsolutePath());
    
                        final CommandLine commandLine = 
flinkYarnSessionCli.parseCommandLineOptions(new String[]{}, true);
    
                        flinkYarnSessionCli.getClusterId(
                                configuration,
                                commandLine);
                } catch (final Exception e) {
    
                }
    
                fail("We should have failed reading the yarn properties file 
when retrieving the cluster id.");
        }
    ```
    
    fails with 
    ```
    java.lang.Exception: Unexpected exception, 
expected<org.apache.flink.configuration.IllegalConfigurationException> but 
was<java.lang.AssertionError>
    ```
    
    The message *We should have failed reading the yarn properties file when 
retrieving the cluster id.* is lost.


---

Reply via email to