Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5225#discussion_r160110442
--- Diff:
flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java ---
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link FlinkYarnSessionCli}.
+ */
+public class FlinkYarnSessionCliTest extends TestLogger {
+
+ private static final ApplicationId TEST_YARN_APPLICATION_ID =
ApplicationId.newInstance(System.currentTimeMillis(), 42);
+
+ private static final ApplicationId TEST_YARN_APPLICATION_ID_2 =
ApplicationId.newInstance(System.currentTimeMillis(), 43);
+
+ private static final String TEST_YARN_JOB_MANAGER_ADDRESS =
"22.33.44.55";
+ private static final int TEST_YARN_JOB_MANAGER_PORT = 6655;
+
+ private static final String validPropertiesFile = "applicationID=" +
TEST_YARN_APPLICATION_ID;
+
+ private static final String invalidPropertiesFile = "jasfobManager=" +
TEST_YARN_JOB_MANAGER_ADDRESS + ":asf" + TEST_YARN_JOB_MANAGER_PORT;
+
+ @Rule
+ public TemporaryFolder tmp = new TemporaryFolder();
+
+ @Test
+ public void testDynamicProperties() throws Exception {
+
+ FlinkYarnSessionCli cli = new FlinkYarnSessionCli(
+ "",
+ "",
+ false);
+ Options options = new Options();
+ cli.addGeneralOptions(options);
+ cli.addRunOptions(options);
+
+ CommandLineParser parser = new DefaultParser();
+ CommandLine cmd = parser.parse(options, new String[]{"run",
"-j", "fake.jar", "-n", "15",
+ "-D", "akka.ask.timeout=5 min", "-D",
"env.java.opts=-DappName=foobar"});
+
+ AbstractYarnClusterDescriptor flinkYarnDescriptor =
cli.createDescriptor(
+ new Configuration(),
+ tmp.getRoot().getAbsolutePath(),
+ null,
+ cmd);
+
+ Assert.assertNotNull(flinkYarnDescriptor);
+
+ Map<String, String> dynProperties =
+
FlinkYarnSessionCli.getDynamicProperties(flinkYarnDescriptor.getDynamicPropertiesEncoded());
+ assertEquals(2, dynProperties.size());
+ assertEquals("5 min", dynProperties.get("akka.ask.timeout"));
+ assertEquals("-DappName=foobar",
dynProperties.get("env.java.opts"));
+ }
+
+ @Test
+ public void testNotEnoughTaskSlots() throws Exception {
+ String[] params =
+ new String[] {"-yn", "2", "-ys", "3", "-p", "7"};
+
+ FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli("y",
"yarn");
+
+ Options options = new Options();
+ // TODO: Nasty workaround: We should get rid of the YarnCLI and
run options coupling
+ options.addOption(CliFrontendParser.PARALLELISM_OPTION);
+ yarnCLI.addGeneralOptions(options);
+ yarnCLI.addRunOptions(options);
+
+ final CommandLine commandLine =
CliFrontendParser.parse(options, params, true);
+
+ ClusterSpecification clusterSpecification =
yarnCLI.createClusterSpecification(new Configuration(), commandLine);
+
+ // each task manager has 3 slots but the parallelism is 7. Thus
the slots should be increased.
+ assertEquals(4, clusterSpecification.getSlotsPerTaskManager());
+ assertEquals(2, clusterSpecification.getNumberTaskManagers());
+ }
+
+ @Test
+ public void testCorrectSettingOfMaxSlots() throws Exception {
+ String[] params =
+ new String[] {"-yn", "2", "-ys", "3"};
+
+ FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli("y",
"yarn");
+
+ final CommandLine commandLine =
yarnCLI.parseCommandLineOptions(params, true);
+
+ final Configuration configuration = new Configuration();
+
+ AbstractYarnClusterDescriptor descriptor =
yarnCLI.createDescriptor(
+ configuration,
+ tmp.getRoot().getAbsolutePath(),
+ "",
+ commandLine);
+
+ final ClusterSpecification clusterSpecification =
yarnCLI.createClusterSpecification(
+ configuration,
+ commandLine);
+
+ // each task manager has 3 slots but the parallelism is 7. Thus
the slots should be increased.
+ assertEquals(3, clusterSpecification.getSlotsPerTaskManager());
+ assertEquals(2, clusterSpecification.getNumberTaskManagers());
+ }
+
+ @Test
+ public void testZookeeperNamespaceProperty() throws Exception {
+ String zkNamespaceCliInput = "flink_test_namespace";
+
+ String[] params = new String[] {"-yn", "2", "-yz",
zkNamespaceCliInput};
+
+ FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli("y",
"yarn");
+
+ CommandLine commandLine =
yarnCLI.parseCommandLineOptions(params, true);
+
+ AbstractYarnClusterDescriptor descriptor =
yarnCLI.createDescriptor(
+ new Configuration(),
+ tmp.getRoot().getAbsolutePath(),
+ "",
+ commandLine);
+
+ assertEquals(zkNamespaceCliInput,
descriptor.getZookeeperNamespace());
+ }
+
+ /**
+ * Test that the CliFrontend is able to pick up the .yarn-properties
file from a specified location.
+ */
+ @Test
+ public void testResumeFromYarnPropertiesFile() throws Exception {
+
+ File directoryPath =
writeYarnPropertiesFile(validPropertiesFile);
+
+ final FlinkYarnSessionCli flinkYarnSessionCli = new
FlinkYarnSessionCli("y", "yarn");
+
+ final Configuration configuration = new Configuration();
+
configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION,
directoryPath.getAbsolutePath());
+
+ final CommandLine commandLine =
flinkYarnSessionCli.parseCommandLineOptions(new String[] {}, true);
+
+ final String clusterId = flinkYarnSessionCli.getClusterId(
+ configuration,
+ commandLine);
+
+ assertEquals(TEST_YARN_APPLICATION_ID.toString(), clusterId);
+ }
+
+ @Test(expected = IllegalConfigurationException.class)
+ public void testInvalidYarnPropertiesFile() throws Exception {
+
+ File directoryPath =
writeYarnPropertiesFile(invalidPropertiesFile);
+
+ final FlinkYarnSessionCli flinkYarnSessionCli = new
FlinkYarnSessionCli("y", "yarn");
+
+ final Configuration configuration = new Configuration();
+
configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION,
directoryPath.getAbsolutePath());
+
+ final CommandLine commandLine =
flinkYarnSessionCli.parseCommandLineOptions(new String[] {}, true);
+
+ flinkYarnSessionCli.getClusterId(
+ configuration,
+ commandLine);
+
+ fail("We should have failed reading the yarn properties file
when retrieving the cluster id.");
--- End diff --
It is wrong to put `fail()` if your test expects an exception to be thrown.
For example:
```
@Test(expected = IllegalConfigurationException.class)
public void testInvalidYarnPropertiesFile() throws Exception {
try {
File directoryPath =
writeYarnPropertiesFile(invalidPropertiesFile);
final FlinkYarnSessionCli flinkYarnSessionCli = new
FlinkYarnSessionCli("y", "yarn");
final Configuration configuration = new Configuration();
configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION,
directoryPath.getAbsolutePath());
final CommandLine commandLine =
flinkYarnSessionCli.parseCommandLineOptions(new String[]{}, true);
flinkYarnSessionCli.getClusterId(
configuration,
commandLine);
} catch (final Exception e) {
}
fail("We should have failed reading the yarn properties file
when retrieving the cluster id.");
}
```
fails with
```
java.lang.Exception: Unexpected exception,
expected<org.apache.flink.configuration.IllegalConfigurationException> but
was<java.lang.AssertionError>
```
The message *We should have failed reading the yarn properties file when
retrieving the cluster id.* is lost.
---