Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5225#discussion_r160110442 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java --- @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.client.cli.CliFrontendParser; +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.util.TestLogger; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.flink.yarn.configuration.YarnConfigOptions; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Options; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link FlinkYarnSessionCli}. + */ +public class FlinkYarnSessionCliTest extends TestLogger { + + private static final ApplicationId TEST_YARN_APPLICATION_ID = ApplicationId.newInstance(System.currentTimeMillis(), 42); + + private static final ApplicationId TEST_YARN_APPLICATION_ID_2 = ApplicationId.newInstance(System.currentTimeMillis(), 43); + + private static final String TEST_YARN_JOB_MANAGER_ADDRESS = "22.33.44.55"; + private static final int TEST_YARN_JOB_MANAGER_PORT = 6655; + + private static final String validPropertiesFile = "applicationID=" + TEST_YARN_APPLICATION_ID; + + private static final String invalidPropertiesFile = "jasfobManager=" + TEST_YARN_JOB_MANAGER_ADDRESS + ":asf" + TEST_YARN_JOB_MANAGER_PORT; + + @Rule + public TemporaryFolder tmp = new TemporaryFolder(); + + @Test + public void testDynamicProperties() throws Exception { + + FlinkYarnSessionCli cli = new FlinkYarnSessionCli( + "", + "", + false); + Options options = new Options(); + cli.addGeneralOptions(options); + cli.addRunOptions(options); + + CommandLineParser parser = new DefaultParser(); + CommandLine cmd = parser.parse(options, new String[]{"run", "-j", "fake.jar", "-n", "15", + "-D", "akka.ask.timeout=5 min", "-D", "env.java.opts=-DappName=foobar"}); + + AbstractYarnClusterDescriptor flinkYarnDescriptor = cli.createDescriptor( + new Configuration(), + tmp.getRoot().getAbsolutePath(), + null, + cmd); + + Assert.assertNotNull(flinkYarnDescriptor); + + Map<String, String> dynProperties = + FlinkYarnSessionCli.getDynamicProperties(flinkYarnDescriptor.getDynamicPropertiesEncoded()); + assertEquals(2, dynProperties.size()); + assertEquals("5 min", dynProperties.get("akka.ask.timeout")); + assertEquals("-DappName=foobar", dynProperties.get("env.java.opts")); + } + + @Test + public void testNotEnoughTaskSlots() throws Exception { + String[] params = + new String[] {"-yn", "2", "-ys", "3", "-p", "7"}; + + FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli("y", "yarn"); + + Options options = new Options(); + // TODO: Nasty workaround: We should get rid of the YarnCLI and run options coupling + options.addOption(CliFrontendParser.PARALLELISM_OPTION); + yarnCLI.addGeneralOptions(options); + yarnCLI.addRunOptions(options); + + final CommandLine commandLine = CliFrontendParser.parse(options, params, true); + + ClusterSpecification clusterSpecification = yarnCLI.createClusterSpecification(new Configuration(), commandLine); + + // each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased. + assertEquals(4, clusterSpecification.getSlotsPerTaskManager()); + assertEquals(2, clusterSpecification.getNumberTaskManagers()); + } + + @Test + public void testCorrectSettingOfMaxSlots() throws Exception { + String[] params = + new String[] {"-yn", "2", "-ys", "3"}; + + FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli("y", "yarn"); + + final CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true); + + final Configuration configuration = new Configuration(); + + AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor( + configuration, + tmp.getRoot().getAbsolutePath(), + "", + commandLine); + + final ClusterSpecification clusterSpecification = yarnCLI.createClusterSpecification( + configuration, + commandLine); + + // each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased. + assertEquals(3, clusterSpecification.getSlotsPerTaskManager()); + assertEquals(2, clusterSpecification.getNumberTaskManagers()); + } + + @Test + public void testZookeeperNamespaceProperty() throws Exception { + String zkNamespaceCliInput = "flink_test_namespace"; + + String[] params = new String[] {"-yn", "2", "-yz", zkNamespaceCliInput}; + + FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli("y", "yarn"); + + CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true); + + AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor( + new Configuration(), + tmp.getRoot().getAbsolutePath(), + "", + commandLine); + + assertEquals(zkNamespaceCliInput, descriptor.getZookeeperNamespace()); + } + + /** + * Test that the CliFrontend is able to pick up the .yarn-properties file from a specified location. + */ + @Test + public void testResumeFromYarnPropertiesFile() throws Exception { + + File directoryPath = writeYarnPropertiesFile(validPropertiesFile); + + final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli("y", "yarn"); + + final Configuration configuration = new Configuration(); + configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath()); + + final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {}, true); + + final String clusterId = flinkYarnSessionCli.getClusterId( + configuration, + commandLine); + + assertEquals(TEST_YARN_APPLICATION_ID.toString(), clusterId); + } + + @Test(expected = IllegalConfigurationException.class) + public void testInvalidYarnPropertiesFile() throws Exception { + + File directoryPath = writeYarnPropertiesFile(invalidPropertiesFile); + + final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli("y", "yarn"); + + final Configuration configuration = new Configuration(); + configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath()); + + final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {}, true); + + flinkYarnSessionCli.getClusterId( + configuration, + commandLine); + + fail("We should have failed reading the yarn properties file when retrieving the cluster id."); --- End diff -- It is wrong to put `fail()` if your test expects an exception to be thrown. For example: ``` @Test(expected = IllegalConfigurationException.class) public void testInvalidYarnPropertiesFile() throws Exception { try { File directoryPath = writeYarnPropertiesFile(invalidPropertiesFile); final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli("y", "yarn"); final Configuration configuration = new Configuration(); configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath()); final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[]{}, true); flinkYarnSessionCli.getClusterId( configuration, commandLine); } catch (final Exception e) { } fail("We should have failed reading the yarn properties file when retrieving the cluster id."); } ``` fails with ``` java.lang.Exception: Unexpected exception, expected<org.apache.flink.configuration.IllegalConfigurationException> but was<java.lang.AssertionError> ``` The message *We should have failed reading the yarn properties file when retrieving the cluster id.* is lost.
---