zentol commented on code in PR #19560:
URL: https://github.com/apache/flink/pull/19560#discussion_r857377443
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterClientFactoryTest.java:
##########
@@ -45,6 +45,6 @@ private void
testKubernetesClusterClientFactoryDiscoveryHelper(final String targ
final ClusterClientFactory<String> factory =
serviceLoader.getClusterClientFactory(configuration);
- assertTrue(factory instanceof KubernetesClusterClientFactory);
+ assertThat(factory instanceof KubernetesClusterClientFactory).isTrue();
Review Comment:
use proper isInstanceOf assertion
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesExtension.java:
##########
@@ -44,14 +48,14 @@ public class KubernetesResource extends ExternalResource {
public static void checkEnv() {
final String kubeConfigEnv = System.getenv("ITCASE_KUBECONFIG");
- Assume.assumeTrue(
- "ITCASE_KUBECONFIG environment is not set.",
- !StringUtils.isNullOrWhitespaceOnly(kubeConfigEnv));
+ assertThat(StringUtils.isNullOrWhitespaceOnly(kubeConfigEnv))
Review Comment:
this causes CI failures; you must use assumeThat here.
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java:
##########
@@ -37,18 +36,22 @@
import io.fabric8.kubernetes.api.model.NamedContextBuilder;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.utils.Serialization;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
/** Base test class for Kubernetes. */
-public class KubernetesTestBase extends TestLogger {
+public class KubernetesTestBase {
Review Comment:
can this class me package-private?
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java:
##########
@@ -105,8 +106,8 @@ public final void setup() throws Exception {
onSetup();
}
- @After
- public void tearDown() throws Exception {
+ @AfterEach
+ private void tearDown() throws Exception {
Review Comment:
life-cycle methods should be package-private as per official junit javadocs.
```suggestion
void tearDown() throws Exception {
```
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java:
##########
@@ -77,20 +75,19 @@ public void testDynamicProperties() throws Exception {
final Configuration executorConfig =
cli.getEffectiveConfiguration(args);
final ClusterClientFactory<String> clientFactory =
getClusterClientFactory(executorConfig);
- Assert.assertNotNull(clientFactory);
+ assertThat(clientFactory).isNotNull();
final Map<String, String> executorConfigMap = executorConfig.toMap();
- assertEquals(4, executorConfigMap.size());
- assertEquals("5 min", executorConfigMap.get("akka.ask.timeout"));
- assertEquals("-DappName=foobar",
executorConfigMap.get("env.java.opts"));
- assertEquals(
- tmp.getRoot().getAbsolutePath(),
- executorConfig.get(DeploymentOptionsInternal.CONF_DIR));
-
assertTrue(executorConfigMap.containsKey(DeploymentOptions.TARGET.key()));
+ assertThat(executorConfigMap).hasSize(4);
+ assertThat(executorConfigMap.get("akka.ask.timeout")).isEqualTo("5
min");
Review Comment:
This block can be re-written to:
`assertThat(executorConfigMap).contains(entry(<key>, <value>), entry(...),
...);`
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriverTest.java:
##########
@@ -71,24 +68,21 @@ public void testNotLeader() throws Exception {
getLeaderCallback().notLeader();
electionEventHandler.waitForRevokeLeader();
- assertThat(electionEventHandler.isLeader(),
is(false));
- assertThat(
-
electionEventHandler.getConfirmedLeaderInformation(),
- is(LeaderInformation.empty()));
+
assertThat(electionEventHandler.isLeader()).isFalse();
+
assertThat(electionEventHandler.getConfirmedLeaderInformation())
+ .isEqualTo(LeaderInformation.empty());
// The ConfigMap should also be cleared
- assertThat(
-
getLeaderConfigMap().getData().get(LEADER_ADDRESS_KEY),
- is(nullValue()));
- assertThat(
-
getLeaderConfigMap().getData().get(LEADER_SESSION_ID_KEY),
- is(nullValue()));
+
assertThat(getLeaderConfigMap().getData().get(LEADER_ADDRESS_KEY))
Review Comment:
can be re-written as a doesNotContain assertion against the map itself
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java:
##########
@@ -44,7 +43,7 @@
import java.util.concurrent.TimeUnit;
/** Base class for high availability unit tests with a configured testing
Kubernetes client. */
-public class KubernetesHighAvailabilityTestBase extends TestLogger {
+public class KubernetesHighAvailabilityTestBase {
Review Comment:
can this be package-private?
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientITCase.java:
##########
@@ -87,8 +85,8 @@ public void setup() throws Exception {
data.size(), new
ExecutorThreadFactory("test-leader-io"));
}
- @After
- public void teardown() throws Exception {
+ @AfterEach
+ private void teardown() throws Exception {
Review Comment:
```suggestion
void teardown() throws Exception {
```
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientITCase.java:
##########
@@ -40,17 +39,16 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.everyItem;
-import static org.hamcrest.Matchers.is;
+import static org.assertj.core.api.Assertions.assertThat;
/**
* IT Tests for {@link
org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient} with real K8s
* server and client.
*/
-public class Fabric8FlinkKubeClientITCase extends TestLogger {
+public class Fabric8FlinkKubeClientITCase {
Review Comment:
```suggestion
class Fabric8FlinkKubeClientITCase {
```
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParametersTest.java:
##########
@@ -24,121 +24,120 @@
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.util.StringUtils;
-import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.RunnableWithException;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
-import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** General tests for the {@link AbstractKubernetesParameters}. */
-public class AbstractKubernetesParametersTest extends TestLogger {
+public class AbstractKubernetesParametersTest {
private final Configuration flinkConfig = new Configuration();
private final TestingKubernetesParameters testingKubernetesParameters =
new TestingKubernetesParameters(flinkConfig);
- @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
@Test
- public void testClusterIdMustNotBeBlank() {
+ void testClusterIdMustNotBeBlank() {
flinkConfig.set(KubernetesConfigOptions.CLUSTER_ID, " ");
- assertThrows(
- "must not be blank",
- IllegalArgumentException.class,
- testingKubernetesParameters::getClusterId);
+ assertThatThrownBy(testingKubernetesParameters::getClusterId)
+ .satisfies(anyCauseMatches(IllegalArgumentException.class,
"must not be blank"));
}
@Test
- public void testClusterIdLengthLimitation() {
+ void testClusterIdLengthLimitation() {
final String stringWithIllegalLength =
StringUtils.generateRandomAlphanumericString(
new Random(),
Constants.MAXIMUM_CHARACTERS_OF_CLUSTER_ID + 1);
flinkConfig.set(KubernetesConfigOptions.CLUSTER_ID,
stringWithIllegalLength);
- assertThrows(
- "must be no more than "
- + Constants.MAXIMUM_CHARACTERS_OF_CLUSTER_ID
- + " characters",
- IllegalArgumentException.class,
- testingKubernetesParameters::getClusterId);
+ assertThatThrownBy(testingKubernetesParameters::getClusterId)
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "must be no more than "
+ +
Constants.MAXIMUM_CHARACTERS_OF_CLUSTER_ID
+ + " characters"));
}
@Test
- public void getConfigDirectory() {
+ void getConfigDirectory() {
final String confDir = "/path/of/flink-conf";
flinkConfig.set(DeploymentOptionsInternal.CONF_DIR, confDir);
- assertThat(testingKubernetesParameters.getConfigDirectory(),
is(confDir));
+
assertThat(testingKubernetesParameters.getConfigDirectory()).isEqualTo(confDir);
}
@Test
- public void getConfigDirectoryFallbackToPodConfDir() {
+ void getConfigDirectoryFallbackToPodConfDir() {
final String confDirInPod =
flinkConfig.get(KubernetesConfigOptions.FLINK_CONF_DIR);
- assertThat(testingKubernetesParameters.getConfigDirectory(),
is(confDirInPod));
+
assertThat(testingKubernetesParameters.getConfigDirectory()).isEqualTo(confDirInPod);
}
@Test
- public void
testGetLocalHadoopConfigurationDirectoryReturnEmptyWhenHadoopEnvIsNotSet()
+ void
testGetLocalHadoopConfigurationDirectoryReturnEmptyWhenHadoopEnvIsNotSet()
throws Exception {
runTestWithEmptyEnv(
() -> {
final Optional<String> optional =
testingKubernetesParameters.getLocalHadoopConfigurationDirectory();
- assertThat(optional.isPresent(), is(false));
+ assertThat(optional).isNotPresent();
});
}
@Test
- public void testGetLocalHadoopConfigurationDirectoryFromHadoopConfDirEnv()
throws Exception {
+ void testGetLocalHadoopConfigurationDirectoryFromHadoopConfDirEnv() throws
Exception {
runTestWithEmptyEnv(
() -> {
final String hadoopConfDir = "/etc/hadoop/conf";
setEnv(Constants.ENV_HADOOP_CONF_DIR, hadoopConfDir);
final Optional<String> optional =
testingKubernetesParameters.getLocalHadoopConfigurationDirectory();
- assertThat(optional.isPresent(), is(true));
- assertThat(optional.get(), is(hadoopConfDir));
+ assertThat(optional).isPresent();
+ assertThat(optional.get()).isEqualTo(hadoopConfDir);
});
}
@Test
- public void testGetLocalHadoopConfigurationDirectoryFromHadoop2HomeEnv()
throws Exception {
+ void testGetLocalHadoopConfigurationDirectoryFromHadoop2HomeEnv(@TempDir
Path temporaryFolder)
+ throws Exception {
runTestWithEmptyEnv(
() -> {
- final String hadoopHome =
temporaryFolder.getRoot().getAbsolutePath();
- temporaryFolder.newFolder("etc", "hadoop");
+ final String hadoopHome =
temporaryFolder.toAbsolutePath().toString();
+
Files.createDirectories(temporaryFolder.resolve("etc/hadoop"));
Review Comment:
```suggestion
Files.createDirectories(temporaryFolder.resolve(Paths.get("etc", "hadoop")));
```
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParametersTest.java:
##########
@@ -106,17 +103,17 @@ protected void onSetup() throws Exception {
}
@Test
- public void testGetEnvironments() {
- assertEquals(customizedEnvs,
kubernetesTaskManagerParameters.getEnvironments());
+ void testGetEnvironments() {
+
assertThat(kubernetesTaskManagerParameters.getEnvironments()).isEqualTo(customizedEnvs);
}
@Test
- public void testGetEmptyAnnotations() {
- assertTrue(kubernetesTaskManagerParameters.getAnnotations().isEmpty());
+ void testGetEmptyAnnotations() {
+
assertThat(kubernetesTaskManagerParameters.getAnnotations().isEmpty()).isTrue();
Review Comment:
```suggestion
assertThat(kubernetesTaskManagerParameters.getAnnotations()).isEmpty();
```
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElectorITCase.java:
##########
@@ -19,36 +19,33 @@
package org.apache.flink.kubernetes.kubeclient.resources;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.KubernetesResource;
+import org.apache.flink.kubernetes.KubernetesExtension;
import
org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory;
-import org.apache.flink.util.TestLogger;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
import java.util.UUID;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.not;
+import static org.assertj.core.api.Assertions.assertThat;
/**
* IT Tests for the {@link KubernetesLeaderElector}. Start multiple leader
contenders currently, one
* should elect successfully. And if current leader dies, a new one could take
over.
*/
-public class KubernetesLeaderElectorITCase extends TestLogger {
-
- @ClassRule public static KubernetesResource kubernetesResource = new
KubernetesResource();
+public class KubernetesLeaderElectorITCase {
+ @RegisterExtension
+ private static final KubernetesExtension kubernetesExtension = new
KubernetesExtension();
Review Comment:
```suggestion
static final KubernetesExtension kubernetesExtension = new
KubernetesExtension();
```
Please also check for occurrences in other files.
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriverTest.java:
##########
@@ -47,23 +47,21 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
/** Tests for {@link KubernetesResourceManagerDriver}. */
-public class KubernetesResourceManagerDriverTest
+class KubernetesResourceManagerDriverTest
extends ResourceManagerDriverTestBase<KubernetesWorkerNode> {
private static final String CLUSTER_ID = "testing-flink-cluster";
private static final KubernetesResourceManagerDriverConfiguration
KUBERNETES_RESOURCE_MANAGER_CONFIGURATION =
new
KubernetesResourceManagerDriverConfiguration(CLUSTER_ID, "localhost:9000");
- @Test
- public void testOnPodAdded() throws Exception {
+ @org.junit.jupiter.api.Test
Review Comment:
dont use qualified imports
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java:
##########
@@ -161,7 +163,7 @@ protected String writeKubeConfigForMockKubernetesServer()
throws Exception {
.build())
.withNewCurrentContext(CLUSTER_ID)
.build();
- final File kubeConfigFile = new
File(temporaryFolder.newFolder(".kube"), "config");
+ final File kubeConfigFile = new
File(kubeConfPath.resolve(".kube").toFile(), "config");
Review Comment:
```suggestion
final File kubeConfigFile =
kubeConfPath.resolve(".kube").resolve("config").toFile();
```
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java:
##########
@@ -138,7 +139,8 @@ protected void generateKerberosFileItems() throws
IOException {
KubernetesTestUtils.createTemporyFile("some conf", kerberosDir,
KRB5_CONF_FILE);
}
- protected String writeKubeConfigForMockKubernetesServer() throws Exception
{
+ protected String writeKubeConfigForMockKubernetesServer(@TempDir Path
kubeConfPath)
Review Comment:
The annotation shouldn't work here, as it is neither a lifecycle nor test
method.
```suggestion
protected String writeKubeConfigForMockKubernetesServer(Path
kubeConfPath)
```
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java:
##########
@@ -88,11 +87,13 @@ protected void setupFlinkConfig() {
protected void onSetup() throws Exception {}
- @Before
- public final void setup() throws Exception {
- flinkConfDir = temporaryFolder.newFolder().getAbsoluteFile();
- hadoopConfDir = temporaryFolder.newFolder().getAbsoluteFile();
- kerberosDir = temporaryFolder.newFolder().getAbsoluteFile();
+ @BeforeEach
+ private final void setup(
Review Comment:
```suggestion
void setup(
```
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/entrypoint/KubernetesWorkerResourceSpecFactoryTest.java:
##########
@@ -22,48 +22,43 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.Is.is;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link KubernetesWorkerResourceSpecFactory}. */
-public class KubernetesWorkerResourceSpecFactoryTest extends TestLogger {
+class KubernetesWorkerResourceSpecFactoryTest {
@Test
- public void testGetCpuCoresCommonOption() {
+ void testGetCpuCoresCommonOption() {
final Configuration configuration = new Configuration();
configuration.setDouble(TaskManagerOptions.CPU_CORES, 1.0);
configuration.setDouble(KubernetesConfigOptions.TASK_MANAGER_CPU, 2.0);
configuration.setDouble(KubernetesConfigOptions.TASK_MANAGER_CPU_LIMIT_FACTOR,
1.5);
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
- assertThat(
-
KubernetesWorkerResourceSpecFactory.getDefaultCpus(configuration),
- is(new CPUResource(1.0)));
+
assertThat(KubernetesWorkerResourceSpecFactory.getDefaultCpus(configuration))
+ .isEqualByComparingTo(new CPUResource(1.0));
}
@Test
- public void testGetCpuCoresKubernetesOption() {
+ void testGetCpuCoresKubernetesOption() {
final Configuration configuration = new Configuration();
configuration.setDouble(KubernetesConfigOptions.TASK_MANAGER_CPU, 2.0);
configuration.setDouble(KubernetesConfigOptions.TASK_MANAGER_CPU_LIMIT_FACTOR,
1.5);
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
- assertThat(
-
KubernetesWorkerResourceSpecFactory.getDefaultCpus(configuration),
- is(new CPUResource(2.0)));
+
assertThat(KubernetesWorkerResourceSpecFactory.getDefaultCpus(configuration))
+ .isEqualByComparingTo(new CPUResource(2.0));
}
@Test
- public void testGetCpuCoresNumSlots() {
+ void testGetCpuCoresNumSlots() {
final Configuration configuration = new Configuration();
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
- assertThat(
-
KubernetesWorkerResourceSpecFactory.getDefaultCpus(configuration),
- is(new CPUResource(3.0)));
+
assertThat(KubernetesWorkerResourceSpecFactory.getDefaultCpus(configuration))
+ .isEqualByComparingTo(new CPUResource(3.0));
Review Comment:
isEqualTo should work just fine.
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityRecoverFromSavepointITCase.java:
##########
@@ -67,40 +68,47 @@
* Tests for recovering from savepoint when Kubernetes HA is enabled. The
savepoint will be
* persisted as a checkpoint and stored in the ConfigMap when recovered
successfully.
*/
-public class KubernetesHighAvailabilityRecoverFromSavepointITCase extends
TestLogger {
+public class KubernetesHighAvailabilityRecoverFromSavepointITCase {
private static final long TIMEOUT = 60 * 1000;
private static final String CLUSTER_ID = "flink-on-k8s-cluster-" +
System.currentTimeMillis();
private static final String FLAT_MAP_UID = "my-flat-map";
- @ClassRule public static KubernetesResource kubernetesResource = new
KubernetesResource();
+ private static Path temporaryPath;
Review Comment:
this should have the `@TempDir` annotation.
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -361,138 +355,121 @@ private void
testNodePortService(KubernetesConfigOptions.NodePortAddressType add
throw new IllegalArgumentException(
String.format("Unexpected address type %s.",
addressType));
}
- assertThat(resultEndpoint.get().getAddress(), isIn(expectedIps));
- assertThat(resultEndpoint.get().getPort(), is(NODE_PORT));
+ assertThat(resultEndpoint.get().getAddress()).isIn(expectedIps);
+ assertThat(resultEndpoint.get().getPort()).isEqualTo(NODE_PORT);
}
}
@Test
- public void testNodePortServiceWithNoMatchingIP() {
+ void testNodePortServiceWithNoMatchingIP() {
mockExpectedServiceFromServerSide(buildExternalServiceWithNodePort());
- assertFalse(flinkKubeClient.getRestEndpoint(CLUSTER_ID).isPresent());
+
assertThat(flinkKubeClient.getRestEndpoint(CLUSTER_ID).isPresent()).isFalse();
Review Comment:
```suggestion
assertThat(flinkKubeClient.getRestEndpoint(CLUSTER_ID)).isNotPresent();
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityRecoverFromSavepointITCase.java:
##########
@@ -130,19 +139,19 @@ public void testRecoverFromSavepoint() throws Exception {
assertThat(clusterClient.requestJobResult(jobId).join().isSuccess()).isTrue();
}
- private Configuration getConfiguration() {
+ private static Configuration getConfiguration() {
Configuration configuration = new Configuration();
configuration.set(KubernetesConfigOptions.CLUSTER_ID, CLUSTER_ID);
configuration.set(
HighAvailabilityOptions.HA_MODE,
KubernetesHaServicesFactory.class.getCanonicalName());
try {
- configuration.set(
- HighAvailabilityOptions.HA_STORAGE_PATH,
- temporaryFolder.newFolder().getAbsolutePath());
+ temporaryPath = Files.createTempDirectory("haStorage");
} catch (IOException e) {
- throw new FlinkRuntimeException("Failed to create HA storage", e);
+ throw new RuntimeException("can't create ha storage path.");
Review Comment:
why are you changing this?
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java:
##########
@@ -399,31 +406,38 @@ public void
testReplaceWithDeletingKeyWithFailingDiscard() throws Exception {
final TestingLongStateHandleHelper.LongStateHandle
newState =
new
TestingLongStateHandleHelper.LongStateHandle(23456L);
- assertThat(store.exists(key),
is(StringResourceVersion.notExisting()));
- final StateHandleStore.NotExistException exception
=
- assertThrows(
-
StateHandleStore.NotExistException.class,
+ assertThat(store.exists(key))
+
.isEqualByComparingTo(StringResourceVersion.notExisting());
+ StateHandleStore.NotExistException exception =
+ catchThrowableOfType(
Review Comment:
This can still be modeled as `assertThatThrownBy().satisfies(...);`
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityRecoverFromSavepointITCase.java:
##########
@@ -67,40 +68,47 @@
* Tests for recovering from savepoint when Kubernetes HA is enabled. The
savepoint will be
* persisted as a checkpoint and stored in the ConfigMap when recovered
successfully.
*/
-public class KubernetesHighAvailabilityRecoverFromSavepointITCase extends
TestLogger {
+public class KubernetesHighAvailabilityRecoverFromSavepointITCase {
private static final long TIMEOUT = 60 * 1000;
private static final String CLUSTER_ID = "flink-on-k8s-cluster-" +
System.currentTimeMillis();
private static final String FLAT_MAP_UID = "my-flat-map";
- @ClassRule public static KubernetesResource kubernetesResource = new
KubernetesResource();
+ private static Path temporaryPath;
- @ClassRule public static TemporaryFolder temporaryFolder = new
TemporaryFolder();
-
- @Rule
- public MiniClusterWithClientResource miniClusterResource =
- new MiniClusterWithClientResource(
+ @RegisterExtension
+ @Order(1)
Review Comment:
why is the order relevant?
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientITCase.java:
##########
@@ -69,9 +67,9 @@ public class Fabric8FlinkKubeClientITCase extends TestLogger {
private ExecutorService executorService;
- @Before
- public void setup() throws Exception {
- flinkKubeClient = kubernetesResource.getFlinkKubeClient();
+ @BeforeEach
+ private void setup() throws Exception {
Review Comment:
```suggestion
void setup() throws Exception {
```
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java:
##########
@@ -77,20 +75,19 @@ public void testDynamicProperties() throws Exception {
final Configuration executorConfig =
cli.getEffectiveConfiguration(args);
final ClusterClientFactory<String> clientFactory =
getClusterClientFactory(executorConfig);
- Assert.assertNotNull(clientFactory);
+ assertThat(clientFactory).isNotNull();
final Map<String, String> executorConfigMap = executorConfig.toMap();
- assertEquals(4, executorConfigMap.size());
- assertEquals("5 min", executorConfigMap.get("akka.ask.timeout"));
- assertEquals("-DappName=foobar",
executorConfigMap.get("env.java.opts"));
- assertEquals(
- tmp.getRoot().getAbsolutePath(),
- executorConfig.get(DeploymentOptionsInternal.CONF_DIR));
-
assertTrue(executorConfigMap.containsKey(DeploymentOptions.TARGET.key()));
+ assertThat(executorConfigMap).hasSize(4);
+ assertThat(executorConfigMap.get("akka.ask.timeout")).isEqualTo("5
min");
+
assertThat(executorConfigMap.get("env.java.opts")).isEqualTo("-DappName=foobar");
+ assertThat(confDirPath.toAbsolutePath().toString())
+
.isEqualTo(executorConfig.get(DeploymentOptionsInternal.CONF_DIR));
Review Comment:
invert assertion
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java:
##########
@@ -575,38 +594,40 @@ public void
testReplaceFailedWithPossiblyInconsistentState() throws Exception {
LOCK_IDENTITY);
final StringResourceVersion resourceVersion =
anotherStore.exists(key);
- assertThat(resourceVersion.isExisting(), is(true));
- try {
- anotherStore.replace(
- key,
- resourceVersion,
- new
TestingLongStateHandleHelper.LongStateHandle(23456L));
- fail(
- "An exception having a
PossibleInconsistentStateException as its cause should have been thrown.");
- } catch (Exception ex) {
- assertThat(ex, is(updateException));
- }
- assertThat(anotherStore.getAllAndLock().size(),
is(1));
+ assertThat(resourceVersion.isExisting()).isTrue();
+ assertThatThrownBy(
+ () ->
+ anotherStore.replace(
+ key,
+ resourceVersion,
+ new
TestingLongStateHandleHelper
+
.LongStateHandle(23456L)),
+ "An exception having a
PossibleInconsistentStateException as its cause should have been thrown.")
+ .satisfies(
+ cause ->
assertThat(cause).isEqualTo(updateException));
Review Comment:
```suggestion
.isEqualTo(updateException);
```
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -361,138 +355,121 @@ private void
testNodePortService(KubernetesConfigOptions.NodePortAddressType add
throw new IllegalArgumentException(
String.format("Unexpected address type %s.",
addressType));
}
- assertThat(resultEndpoint.get().getAddress(), isIn(expectedIps));
- assertThat(resultEndpoint.get().getPort(), is(NODE_PORT));
+ assertThat(resultEndpoint.get().getAddress()).isIn(expectedIps);
+ assertThat(resultEndpoint.get().getPort()).isEqualTo(NODE_PORT);
}
}
@Test
- public void testNodePortServiceWithNoMatchingIP() {
+ void testNodePortServiceWithNoMatchingIP() {
mockExpectedServiceFromServerSide(buildExternalServiceWithNodePort());
- assertFalse(flinkKubeClient.getRestEndpoint(CLUSTER_ID).isPresent());
+
assertThat(flinkKubeClient.getRestEndpoint(CLUSTER_ID).isPresent()).isFalse();
}
@Test
- public void testClusterIPService() {
+ void testClusterIPService() {
mockExpectedServiceFromServerSide(buildExternalServiceWithClusterIP());
final Optional<Endpoint> resultEndpoint =
flinkKubeClient.getRestEndpoint(CLUSTER_ID);
- assertThat(resultEndpoint.isPresent(), is(true));
- assertThat(
- resultEndpoint.get().getAddress(),
- is(
+ assertThat(resultEndpoint).isPresent();
+ assertThat(resultEndpoint.get().getAddress())
+ .isEqualTo(
ExternalServiceDecorator.getNamespacedExternalServiceName(
- CLUSTER_ID, NAMESPACE)));
- assertThat(resultEndpoint.get().getPort(), is(REST_PORT));
+ CLUSTER_ID, NAMESPACE));
+ assertThat(resultEndpoint.get().getPort()).isEqualTo(REST_PORT);
}
@Test
- public void testStopAndCleanupCluster() throws Exception {
+ void testStopAndCleanupCluster() throws Exception {
this.flinkKubeClient.createJobManagerComponent(this.kubernetesJobManagerSpecification);
final KubernetesPod kubernetesPod =
buildKubernetesPod(TASKMANAGER_POD_NAME);
this.flinkKubeClient.createTaskManagerPod(kubernetesPod).get();
- assertEquals(
- 1,
- this.kubeClient
- .apps()
- .deployments()
- .inNamespace(NAMESPACE)
- .list()
- .getItems()
- .size());
- assertEquals(
- 1,
this.kubeClient.configMaps().inNamespace(NAMESPACE).list().getItems().size());
- assertEquals(2,
this.kubeClient.services().inNamespace(NAMESPACE).list().getItems().size());
- assertEquals(1,
this.kubeClient.pods().inNamespace(NAMESPACE).list().getItems().size());
+ assertThat(
+ this.kubeClient
+ .apps()
+ .deployments()
+ .inNamespace(NAMESPACE)
+ .list()
+ .getItems()
+ .size())
+ .isEqualTo(1);
+
assertThat(this.kubeClient.configMaps().inNamespace(NAMESPACE).list().getItems().size())
+ .isEqualTo(1);
+
assertThat(this.kubeClient.services().inNamespace(NAMESPACE).list().getItems()).hasSize(2);
+
assertThat(this.kubeClient.pods().inNamespace(NAMESPACE).list().getItems()).hasSize(1);
this.flinkKubeClient.stopAndCleanupCluster(CLUSTER_ID);
- assertTrue(
- this.kubeClient
- .apps()
- .deployments()
- .inNamespace(NAMESPACE)
- .list()
- .getItems()
- .isEmpty());
+
assertThat(this.kubeClient.apps().deployments().inNamespace(NAMESPACE).list().getItems())
+ .isEmpty();
}
@Test
- public void testCreateConfigMap() throws Exception {
+ void testCreateConfigMap() throws Exception {
final KubernetesConfigMap configMap = buildTestingConfigMap();
this.flinkKubeClient.createConfigMap(configMap).get();
final Optional<KubernetesConfigMap> currentOpt =
this.flinkKubeClient.getConfigMap(TESTING_CONFIG_MAP_NAME);
- assertThat(currentOpt.isPresent(), is(true));
- assertThat(
- currentOpt.get().getData().get(TESTING_CONFIG_MAP_KEY),
- is(TESTING_CONFIG_MAP_VALUE));
+ assertThat(currentOpt).isPresent();
+ assertThat(currentOpt.get().getData())
+ .containsEntry(TESTING_CONFIG_MAP_KEY,
TESTING_CONFIG_MAP_VALUE);
Review Comment:
could be combined to `assertThat(currentOpt).hasValueSatisfying(...)`
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/FlinkPodTest.java:
##########
@@ -23,32 +23,29 @@
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link FlinkPod}. */
-public class FlinkPodTest extends KubernetesTestBase {
+class FlinkPodTest extends KubernetesTestBase {
@Test
- public void testCopyFlinkPod() {
+ void testCopyFlinkPod() {
final FlinkPod flinkPod =
KubernetesUtils.loadPodFromTemplateFile(
flinkKubeClient,
KubernetesPodTemplateTestUtils.getPodTemplateFile(),
KubernetesPodTemplateTestUtils.TESTING_MAIN_CONTAINER_NAME);
final FlinkPod copiedFlinkPod = flinkPod.copy();
- assertThat(flinkPod == copiedFlinkPod, is(false));
+ assertThat(flinkPod == copiedFlinkPod).isFalse();
assertThat(
- flinkPod.getPodWithoutMainContainer()
- == copiedFlinkPod.getPodWithoutMainContainer(),
- is(false));
- assertThat(
- flinkPod.getPodWithoutMainContainer(),
- is(equalTo(copiedFlinkPod.getPodWithoutMainContainer())));
- assertThat(flinkPod.getMainContainer() ==
copiedFlinkPod.getMainContainer(), is(false));
- assertThat(flinkPod.getMainContainer(),
is(equalTo(copiedFlinkPod.getMainContainer())));
+ flinkPod.getPodWithoutMainContainer()
Review Comment:
isNotEqualTo
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/FlinkPodTest.java:
##########
@@ -23,32 +23,29 @@
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link FlinkPod}. */
-public class FlinkPodTest extends KubernetesTestBase {
+class FlinkPodTest extends KubernetesTestBase {
@Test
- public void testCopyFlinkPod() {
+ void testCopyFlinkPod() {
final FlinkPod flinkPod =
KubernetesUtils.loadPodFromTemplateFile(
flinkKubeClient,
KubernetesPodTemplateTestUtils.getPodTemplateFile(),
KubernetesPodTemplateTestUtils.TESTING_MAIN_CONTAINER_NAME);
final FlinkPod copiedFlinkPod = flinkPod.copy();
- assertThat(flinkPod == copiedFlinkPod, is(false));
+ assertThat(flinkPod == copiedFlinkPod).isFalse();
assertThat(
- flinkPod.getPodWithoutMainContainer()
- == copiedFlinkPod.getPodWithoutMainContainer(),
- is(false));
- assertThat(
- flinkPod.getPodWithoutMainContainer(),
- is(equalTo(copiedFlinkPod.getPodWithoutMainContainer())));
- assertThat(flinkPod.getMainContainer() ==
copiedFlinkPod.getMainContainer(), is(false));
- assertThat(flinkPod.getMainContainer(),
is(equalTo(copiedFlinkPod.getMainContainer())));
+ flinkPod.getPodWithoutMainContainer()
+ == copiedFlinkPod.getPodWithoutMainContainer())
+ .isFalse();
+ assertThat(flinkPod.getPodWithoutMainContainer())
+ .isEqualTo(copiedFlinkPod.getPodWithoutMainContainer());
+ assertThat(flinkPod.getMainContainer() ==
copiedFlinkPod.getMainContainer()).isFalse();
Review Comment:
isNotEqualTo
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/FlinkPodTest.java:
##########
@@ -23,32 +23,29 @@
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link FlinkPod}. */
-public class FlinkPodTest extends KubernetesTestBase {
+class FlinkPodTest extends KubernetesTestBase {
@Test
- public void testCopyFlinkPod() {
+ void testCopyFlinkPod() {
final FlinkPod flinkPod =
KubernetesUtils.loadPodFromTemplateFile(
flinkKubeClient,
KubernetesPodTemplateTestUtils.getPodTemplateFile(),
KubernetesPodTemplateTestUtils.TESTING_MAIN_CONTAINER_NAME);
final FlinkPod copiedFlinkPod = flinkPod.copy();
- assertThat(flinkPod == copiedFlinkPod, is(false));
+ assertThat(flinkPod == copiedFlinkPod).isFalse();
Review Comment:
isNotEqualTo
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcherTest.java:
##########
@@ -75,25 +71,23 @@ public void testCallbackHandler() {
podsWatcher.eventReceived(Watcher.Action.DELETED,
pod.getPodWithoutMainContainer());
podsWatcher.eventReceived(Watcher.Action.ERROR,
pod.getPodWithoutMainContainer());
- assertThat(podAddedList.size(), is(1));
- assertThat(podModifiedList.size(), is(1));
- assertThat(podDeletedList.size(), is(1));
- assertThat(podErrorList.size(), is(1));
+ assertThat(podAddedList).hasSize(1);
+ assertThat(podModifiedList).hasSize(1);
+ assertThat(podDeletedList).hasSize(1);
+ assertThat(podErrorList).hasSize(1);
}
@Test
- public void testClosingWithTooOldResourceVersion() {
+ void testClosingWithTooOldResourceVersion() {
final String errMsg = "too old resource version";
final KubernetesPodsWatcher podsWatcher =
new KubernetesPodsWatcher(
new TestingCallbackHandler(
e -> {
- assertThat(
- e,
- Matchers.instanceOf(
-
KubernetesTooOldResourceVersionException
- .class));
- assertThat(e,
FlinkMatchers.containsMessage(errMsg));
+ assertThat(e)
+ .isInstanceOf(
+
KubernetesTooOldResourceVersionException.class);
+ assertThat(e).hasMessageContaining(errMsg);
Review Comment:
```suggestion
assertThat(e)
.isInstanceOf(
KubernetesTooOldResourceVersionException.class)
.hasMessageContaining(errMsg);
```
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java:
##########
@@ -76,16 +72,16 @@ public void testGetEnvironments() {
final Map<String, String> resultEnvironments =
kubernetesJobManagerParameters.getEnvironments();
- assertEquals(expectedEnvironments, resultEnvironments);
+ assertThat(resultEnvironments).isEqualTo(expectedEnvironments);
}
@Test
- public void testGetEmptyAnnotations() {
- assertTrue(kubernetesJobManagerParameters.getAnnotations().isEmpty());
+ void testGetEmptyAnnotations() {
+
assertThat(kubernetesJobManagerParameters.getAnnotations().isEmpty()).isTrue();
Review Comment:
```suggestion
assertThat(kubernetesJobManagerParameters.getAnnotations()).isEmpty();
```
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -571,35 +543,34 @@ public void
testCheckAndUpdateConfigMapWhenGetConfigMapFailed() throws Exception
mockGetConfigMapFailed(configMap.getInternalResource());
final int initialRequestCount = server.getRequestCount();
- try {
- this.flinkKubeClient
- .checkAndUpdateConfigMap(
- TESTING_CONFIG_MAP_NAME,
- c -> {
- throw new AssertionError(
- "The replace operation should have
never been triggered.");
- })
- .get();
- fail(
- "checkAndUpdateConfigMap should fail without a
PossibleInconsistentStateException being the cause when number of retries has
been exhausted.");
- } catch (Exception ex) {
- assertThat(
- ex,
- FlinkMatchers.containsMessage(
- "Could not complete the "
- + "operation. Number of retries has been
exhausted."));
- final int actualRetryCount = server.getRequestCount() -
initialRequestCount;
- assertThat(actualRetryCount, is(configuredRetries + 1));
- assertThat(
- "An error while retrieving the ConfigMap should not cause
a PossibleInconsistentStateException.",
- ExceptionUtils.findThrowable(ex,
PossibleInconsistentStateException.class)
- .isPresent(),
- is(false));
- }
+ assertThatThrownBy(
+ () ->
+ this.flinkKubeClient
+ .checkAndUpdateConfigMap(
+ TESTING_CONFIG_MAP_NAME,
+ c -> {
+ throw new AssertionError(
+ "The replace
operation should have never been triggered.");
+ })
+ .get(),
+ "checkAndUpdateConfigMap should fail without a
PossibleInconsistentStateException being the cause when number of retries has
been exhausted.")
+ .satisfies(
+ anyCauseMatches(
+ "Could not complete the "
+ + "operation. Number of retries has
been exhausted."))
Review Comment:
```suggestion
"Could not complete the operation. Number of
retries has been exhausted."))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]