zentol commented on code in PR #19560:
URL: https://github.com/apache/flink/pull/19560#discussion_r857377443


##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterClientFactoryTest.java:
##########
@@ -45,6 +45,6 @@ private void 
testKubernetesClusterClientFactoryDiscoveryHelper(final String targ
         final ClusterClientFactory<String> factory =
                 serviceLoader.getClusterClientFactory(configuration);
 
-        assertTrue(factory instanceof KubernetesClusterClientFactory);
+        assertThat(factory instanceof KubernetesClusterClientFactory).isTrue();

Review Comment:
   use proper isInstanceOf assertion



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesExtension.java:
##########
@@ -44,14 +48,14 @@ public class KubernetesResource extends ExternalResource {
 
     public static void checkEnv() {
         final String kubeConfigEnv = System.getenv("ITCASE_KUBECONFIG");
-        Assume.assumeTrue(
-                "ITCASE_KUBECONFIG environment is not set.",
-                !StringUtils.isNullOrWhitespaceOnly(kubeConfigEnv));
+        assertThat(StringUtils.isNullOrWhitespaceOnly(kubeConfigEnv))

Review Comment:
   this causes CI failures; you must use assumeThat here.



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java:
##########
@@ -37,18 +36,22 @@
 import io.fabric8.kubernetes.api.model.NamedContextBuilder;
 import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
 import io.fabric8.kubernetes.client.utils.Serialization;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.Map;
 
 /** Base test class for Kubernetes. */
-public class KubernetesTestBase extends TestLogger {
+public class KubernetesTestBase {

Review Comment:
   can this class me package-private?



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java:
##########
@@ -105,8 +106,8 @@ public final void setup() throws Exception {
         onSetup();
     }
 
-    @After
-    public void tearDown() throws Exception {
+    @AfterEach
+    private void tearDown() throws Exception {

Review Comment:
   life-cycle methods should be package-private as per official junit javadocs.
   
   ```suggestion
       void tearDown() throws Exception {
   ```



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java:
##########
@@ -77,20 +75,19 @@ public void testDynamicProperties() throws Exception {
         final Configuration executorConfig = 
cli.getEffectiveConfiguration(args);
         final ClusterClientFactory<String> clientFactory = 
getClusterClientFactory(executorConfig);
 
-        Assert.assertNotNull(clientFactory);
+        assertThat(clientFactory).isNotNull();
 
         final Map<String, String> executorConfigMap = executorConfig.toMap();
-        assertEquals(4, executorConfigMap.size());
-        assertEquals("5 min", executorConfigMap.get("akka.ask.timeout"));
-        assertEquals("-DappName=foobar", 
executorConfigMap.get("env.java.opts"));
-        assertEquals(
-                tmp.getRoot().getAbsolutePath(),
-                executorConfig.get(DeploymentOptionsInternal.CONF_DIR));
-        
assertTrue(executorConfigMap.containsKey(DeploymentOptions.TARGET.key()));
+        assertThat(executorConfigMap).hasSize(4);
+        assertThat(executorConfigMap.get("akka.ask.timeout")).isEqualTo("5 
min");

Review Comment:
   This block can be re-written to:
   `assertThat(executorConfigMap).contains(entry(<key>, <value>), entry(...), 
...);`



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriverTest.java:
##########
@@ -71,24 +68,21 @@ public void testNotLeader() throws Exception {
                             getLeaderCallback().notLeader();
 
                             electionEventHandler.waitForRevokeLeader();
-                            assertThat(electionEventHandler.isLeader(), 
is(false));
-                            assertThat(
-                                    
electionEventHandler.getConfirmedLeaderInformation(),
-                                    is(LeaderInformation.empty()));
+                            
assertThat(electionEventHandler.isLeader()).isFalse();
+                            
assertThat(electionEventHandler.getConfirmedLeaderInformation())
+                                    .isEqualTo(LeaderInformation.empty());
                             // The ConfigMap should also be cleared
-                            assertThat(
-                                    
getLeaderConfigMap().getData().get(LEADER_ADDRESS_KEY),
-                                    is(nullValue()));
-                            assertThat(
-                                    
getLeaderConfigMap().getData().get(LEADER_SESSION_ID_KEY),
-                                    is(nullValue()));
+                            
assertThat(getLeaderConfigMap().getData().get(LEADER_ADDRESS_KEY))

Review Comment:
   can be re-written as a doesNotContain assertion against the map itself



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java:
##########
@@ -44,7 +43,7 @@
 import java.util.concurrent.TimeUnit;
 
 /** Base class for high availability unit tests with a configured testing 
Kubernetes client. */
-public class KubernetesHighAvailabilityTestBase extends TestLogger {
+public class KubernetesHighAvailabilityTestBase {

Review Comment:
   can this be package-private?



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientITCase.java:
##########
@@ -87,8 +85,8 @@ public void setup() throws Exception {
                         data.size(), new 
ExecutorThreadFactory("test-leader-io"));
     }
 
-    @After
-    public void teardown() throws Exception {
+    @AfterEach
+    private void teardown() throws Exception {

Review Comment:
   ```suggestion
       void teardown() throws Exception {
   ```



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientITCase.java:
##########
@@ -40,17 +39,16 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.everyItem;
-import static org.hamcrest.Matchers.is;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * IT Tests for {@link 
org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient} with real K8s
  * server and client.
  */
-public class Fabric8FlinkKubeClientITCase extends TestLogger {
+public class Fabric8FlinkKubeClientITCase {

Review Comment:
   ```suggestion
   class Fabric8FlinkKubeClientITCase {
   ```



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParametersTest.java:
##########
@@ -24,121 +24,120 @@
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.util.StringUtils;
-import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.RunnableWithException;
 
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
 
-import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** General tests for the {@link AbstractKubernetesParameters}. */
-public class AbstractKubernetesParametersTest extends TestLogger {
+public class AbstractKubernetesParametersTest {
 
     private final Configuration flinkConfig = new Configuration();
     private final TestingKubernetesParameters testingKubernetesParameters =
             new TestingKubernetesParameters(flinkConfig);
 
-    @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
     @Test
-    public void testClusterIdMustNotBeBlank() {
+    void testClusterIdMustNotBeBlank() {
         flinkConfig.set(KubernetesConfigOptions.CLUSTER_ID, "  ");
-        assertThrows(
-                "must not be blank",
-                IllegalArgumentException.class,
-                testingKubernetesParameters::getClusterId);
+        assertThatThrownBy(testingKubernetesParameters::getClusterId)
+                .satisfies(anyCauseMatches(IllegalArgumentException.class, 
"must not be blank"));
     }
 
     @Test
-    public void testClusterIdLengthLimitation() {
+    void testClusterIdLengthLimitation() {
         final String stringWithIllegalLength =
                 StringUtils.generateRandomAlphanumericString(
                         new Random(), 
Constants.MAXIMUM_CHARACTERS_OF_CLUSTER_ID + 1);
         flinkConfig.set(KubernetesConfigOptions.CLUSTER_ID, 
stringWithIllegalLength);
-        assertThrows(
-                "must be no more than "
-                        + Constants.MAXIMUM_CHARACTERS_OF_CLUSTER_ID
-                        + " characters",
-                IllegalArgumentException.class,
-                testingKubernetesParameters::getClusterId);
+        assertThatThrownBy(testingKubernetesParameters::getClusterId)
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "must be no more than "
+                                        + 
Constants.MAXIMUM_CHARACTERS_OF_CLUSTER_ID
+                                        + " characters"));
     }
 
     @Test
-    public void getConfigDirectory() {
+    void getConfigDirectory() {
         final String confDir = "/path/of/flink-conf";
         flinkConfig.set(DeploymentOptionsInternal.CONF_DIR, confDir);
-        assertThat(testingKubernetesParameters.getConfigDirectory(), 
is(confDir));
+        
assertThat(testingKubernetesParameters.getConfigDirectory()).isEqualTo(confDir);
     }
 
     @Test
-    public void getConfigDirectoryFallbackToPodConfDir() {
+    void getConfigDirectoryFallbackToPodConfDir() {
         final String confDirInPod = 
flinkConfig.get(KubernetesConfigOptions.FLINK_CONF_DIR);
-        assertThat(testingKubernetesParameters.getConfigDirectory(), 
is(confDirInPod));
+        
assertThat(testingKubernetesParameters.getConfigDirectory()).isEqualTo(confDirInPod);
     }
 
     @Test
-    public void 
testGetLocalHadoopConfigurationDirectoryReturnEmptyWhenHadoopEnvIsNotSet()
+    void 
testGetLocalHadoopConfigurationDirectoryReturnEmptyWhenHadoopEnvIsNotSet()
             throws Exception {
         runTestWithEmptyEnv(
                 () -> {
                     final Optional<String> optional =
                             
testingKubernetesParameters.getLocalHadoopConfigurationDirectory();
-                    assertThat(optional.isPresent(), is(false));
+                    assertThat(optional).isNotPresent();
                 });
     }
 
     @Test
-    public void testGetLocalHadoopConfigurationDirectoryFromHadoopConfDirEnv() 
throws Exception {
+    void testGetLocalHadoopConfigurationDirectoryFromHadoopConfDirEnv() throws 
Exception {
         runTestWithEmptyEnv(
                 () -> {
                     final String hadoopConfDir = "/etc/hadoop/conf";
                     setEnv(Constants.ENV_HADOOP_CONF_DIR, hadoopConfDir);
 
                     final Optional<String> optional =
                             
testingKubernetesParameters.getLocalHadoopConfigurationDirectory();
-                    assertThat(optional.isPresent(), is(true));
-                    assertThat(optional.get(), is(hadoopConfDir));
+                    assertThat(optional).isPresent();
+                    assertThat(optional.get()).isEqualTo(hadoopConfDir);
                 });
     }
 
     @Test
-    public void testGetLocalHadoopConfigurationDirectoryFromHadoop2HomeEnv() 
throws Exception {
+    void testGetLocalHadoopConfigurationDirectoryFromHadoop2HomeEnv(@TempDir 
Path temporaryFolder)
+            throws Exception {
         runTestWithEmptyEnv(
                 () -> {
-                    final String hadoopHome = 
temporaryFolder.getRoot().getAbsolutePath();
-                    temporaryFolder.newFolder("etc", "hadoop");
+                    final String hadoopHome = 
temporaryFolder.toAbsolutePath().toString();
+                    
Files.createDirectories(temporaryFolder.resolve("etc/hadoop"));

Review Comment:
   ```suggestion
                       
Files.createDirectories(temporaryFolder.resolve(Paths.get("etc", "hadoop")));
   ```



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParametersTest.java:
##########
@@ -106,17 +103,17 @@ protected void onSetup() throws Exception {
     }
 
     @Test
-    public void testGetEnvironments() {
-        assertEquals(customizedEnvs, 
kubernetesTaskManagerParameters.getEnvironments());
+    void testGetEnvironments() {
+        
assertThat(kubernetesTaskManagerParameters.getEnvironments()).isEqualTo(customizedEnvs);
     }
 
     @Test
-    public void testGetEmptyAnnotations() {
-        assertTrue(kubernetesTaskManagerParameters.getAnnotations().isEmpty());
+    void testGetEmptyAnnotations() {
+        
assertThat(kubernetesTaskManagerParameters.getAnnotations().isEmpty()).isTrue();

Review Comment:
   ```suggestion
           
assertThat(kubernetesTaskManagerParameters.getAnnotations()).isEmpty();
   ```



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElectorITCase.java:
##########
@@ -19,36 +19,33 @@
 package org.apache.flink.kubernetes.kubeclient.resources;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.KubernetesResource;
+import org.apache.flink.kubernetes.KubernetesExtension;
 import 
org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.UUID;
 
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.not;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * IT Tests for the {@link KubernetesLeaderElector}. Start multiple leader 
contenders currently, one
  * should elect successfully. And if current leader dies, a new one could take 
over.
  */
-public class KubernetesLeaderElectorITCase extends TestLogger {
-
-    @ClassRule public static KubernetesResource kubernetesResource = new 
KubernetesResource();
+public class KubernetesLeaderElectorITCase {
+    @RegisterExtension
+    private static final KubernetesExtension kubernetesExtension = new 
KubernetesExtension();

Review Comment:
   ```suggestion
       static final KubernetesExtension kubernetesExtension = new 
KubernetesExtension();
   ```
   Please also check for occurrences in other files.



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriverTest.java:
##########
@@ -47,23 +47,21 @@
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
 
 /** Tests for {@link KubernetesResourceManagerDriver}. */
-public class KubernetesResourceManagerDriverTest
+class KubernetesResourceManagerDriverTest
         extends ResourceManagerDriverTestBase<KubernetesWorkerNode> {
 
     private static final String CLUSTER_ID = "testing-flink-cluster";
     private static final KubernetesResourceManagerDriverConfiguration
             KUBERNETES_RESOURCE_MANAGER_CONFIGURATION =
                     new 
KubernetesResourceManagerDriverConfiguration(CLUSTER_ID, "localhost:9000");
 
-    @Test
-    public void testOnPodAdded() throws Exception {
+    @org.junit.jupiter.api.Test

Review Comment:
   dont use qualified imports



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java:
##########
@@ -161,7 +163,7 @@ protected String writeKubeConfigForMockKubernetesServer() 
throws Exception {
                                         .build())
                         .withNewCurrentContext(CLUSTER_ID)
                         .build();
-        final File kubeConfigFile = new 
File(temporaryFolder.newFolder(".kube"), "config");
+        final File kubeConfigFile = new 
File(kubeConfPath.resolve(".kube").toFile(), "config");

Review Comment:
   ```suggestion
           final File kubeConfigFile = 
kubeConfPath.resolve(".kube").resolve("config").toFile();
   ```



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java:
##########
@@ -138,7 +139,8 @@ protected void generateKerberosFileItems() throws 
IOException {
         KubernetesTestUtils.createTemporyFile("some conf", kerberosDir, 
KRB5_CONF_FILE);
     }
 
-    protected String writeKubeConfigForMockKubernetesServer() throws Exception 
{
+    protected String writeKubeConfigForMockKubernetesServer(@TempDir Path 
kubeConfPath)

Review Comment:
   The annotation shouldn't work here, as it is neither a lifecycle nor test 
method.
   ```suggestion
       protected String writeKubeConfigForMockKubernetesServer(Path 
kubeConfPath)
   ```



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java:
##########
@@ -88,11 +87,13 @@ protected void setupFlinkConfig() {
 
     protected void onSetup() throws Exception {}
 
-    @Before
-    public final void setup() throws Exception {
-        flinkConfDir = temporaryFolder.newFolder().getAbsoluteFile();
-        hadoopConfDir = temporaryFolder.newFolder().getAbsoluteFile();
-        kerberosDir = temporaryFolder.newFolder().getAbsoluteFile();
+    @BeforeEach
+    private final void setup(

Review Comment:
   ```suggestion
       void setup(
   ```



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/entrypoint/KubernetesWorkerResourceSpecFactoryTest.java:
##########
@@ -22,48 +22,43 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.Is.is;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link KubernetesWorkerResourceSpecFactory}. */
-public class KubernetesWorkerResourceSpecFactoryTest extends TestLogger {
+class KubernetesWorkerResourceSpecFactoryTest {
 
     @Test
-    public void testGetCpuCoresCommonOption() {
+    void testGetCpuCoresCommonOption() {
         final Configuration configuration = new Configuration();
         configuration.setDouble(TaskManagerOptions.CPU_CORES, 1.0);
         configuration.setDouble(KubernetesConfigOptions.TASK_MANAGER_CPU, 2.0);
         
configuration.setDouble(KubernetesConfigOptions.TASK_MANAGER_CPU_LIMIT_FACTOR, 
1.5);
         configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
 
-        assertThat(
-                
KubernetesWorkerResourceSpecFactory.getDefaultCpus(configuration),
-                is(new CPUResource(1.0)));
+        
assertThat(KubernetesWorkerResourceSpecFactory.getDefaultCpus(configuration))
+                .isEqualByComparingTo(new CPUResource(1.0));
     }
 
     @Test
-    public void testGetCpuCoresKubernetesOption() {
+    void testGetCpuCoresKubernetesOption() {
         final Configuration configuration = new Configuration();
         configuration.setDouble(KubernetesConfigOptions.TASK_MANAGER_CPU, 2.0);
         
configuration.setDouble(KubernetesConfigOptions.TASK_MANAGER_CPU_LIMIT_FACTOR, 
1.5);
         configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
 
-        assertThat(
-                
KubernetesWorkerResourceSpecFactory.getDefaultCpus(configuration),
-                is(new CPUResource(2.0)));
+        
assertThat(KubernetesWorkerResourceSpecFactory.getDefaultCpus(configuration))
+                .isEqualByComparingTo(new CPUResource(2.0));
     }
 
     @Test
-    public void testGetCpuCoresNumSlots() {
+    void testGetCpuCoresNumSlots() {
         final Configuration configuration = new Configuration();
         configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
 
-        assertThat(
-                
KubernetesWorkerResourceSpecFactory.getDefaultCpus(configuration),
-                is(new CPUResource(3.0)));
+        
assertThat(KubernetesWorkerResourceSpecFactory.getDefaultCpus(configuration))
+                .isEqualByComparingTo(new CPUResource(3.0));

Review Comment:
   isEqualTo should work just fine.



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityRecoverFromSavepointITCase.java:
##########
@@ -67,40 +68,47 @@
  * Tests for recovering from savepoint when Kubernetes HA is enabled. The 
savepoint will be
  * persisted as a checkpoint and stored in the ConfigMap when recovered 
successfully.
  */
-public class KubernetesHighAvailabilityRecoverFromSavepointITCase extends 
TestLogger {
+public class KubernetesHighAvailabilityRecoverFromSavepointITCase {
 
     private static final long TIMEOUT = 60 * 1000;
 
     private static final String CLUSTER_ID = "flink-on-k8s-cluster-" + 
System.currentTimeMillis();
 
     private static final String FLAT_MAP_UID = "my-flat-map";
 
-    @ClassRule public static KubernetesResource kubernetesResource = new 
KubernetesResource();
+    private static Path temporaryPath;

Review Comment:
   this should have the `@TempDir` annotation.



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -361,138 +355,121 @@ private void 
testNodePortService(KubernetesConfigOptions.NodePortAddressType add
                     throw new IllegalArgumentException(
                             String.format("Unexpected address type %s.", 
addressType));
             }
-            assertThat(resultEndpoint.get().getAddress(), isIn(expectedIps));
-            assertThat(resultEndpoint.get().getPort(), is(NODE_PORT));
+            assertThat(resultEndpoint.get().getAddress()).isIn(expectedIps);
+            assertThat(resultEndpoint.get().getPort()).isEqualTo(NODE_PORT);
         }
     }
 
     @Test
-    public void testNodePortServiceWithNoMatchingIP() {
+    void testNodePortServiceWithNoMatchingIP() {
         mockExpectedServiceFromServerSide(buildExternalServiceWithNodePort());
-        assertFalse(flinkKubeClient.getRestEndpoint(CLUSTER_ID).isPresent());
+        
assertThat(flinkKubeClient.getRestEndpoint(CLUSTER_ID).isPresent()).isFalse();

Review Comment:
   ```suggestion
           
assertThat(flinkKubeClient.getRestEndpoint(CLUSTER_ID)).isNotPresent();



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityRecoverFromSavepointITCase.java:
##########
@@ -130,19 +139,19 @@ public void testRecoverFromSavepoint() throws Exception {
         
assertThat(clusterClient.requestJobResult(jobId).join().isSuccess()).isTrue();
     }
 
-    private Configuration getConfiguration() {
+    private static Configuration getConfiguration() {
         Configuration configuration = new Configuration();
         configuration.set(KubernetesConfigOptions.CLUSTER_ID, CLUSTER_ID);
         configuration.set(
                 HighAvailabilityOptions.HA_MODE,
                 KubernetesHaServicesFactory.class.getCanonicalName());
         try {
-            configuration.set(
-                    HighAvailabilityOptions.HA_STORAGE_PATH,
-                    temporaryFolder.newFolder().getAbsolutePath());
+            temporaryPath = Files.createTempDirectory("haStorage");
         } catch (IOException e) {
-            throw new FlinkRuntimeException("Failed to create HA storage", e);
+            throw new RuntimeException("can't create ha storage path.");

Review Comment:
   why are you changing this?



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java:
##########
@@ -399,31 +406,38 @@ public void 
testReplaceWithDeletingKeyWithFailingDiscard() throws Exception {
                             final TestingLongStateHandleHelper.LongStateHandle 
newState =
                                     new 
TestingLongStateHandleHelper.LongStateHandle(23456L);
 
-                            assertThat(store.exists(key), 
is(StringResourceVersion.notExisting()));
-                            final StateHandleStore.NotExistException exception 
=
-                                    assertThrows(
-                                            
StateHandleStore.NotExistException.class,
+                            assertThat(store.exists(key))
+                                    
.isEqualByComparingTo(StringResourceVersion.notExisting());
+                            StateHandleStore.NotExistException exception =
+                                    catchThrowableOfType(

Review Comment:
   This can still be modeled as `assertThatThrownBy().satisfies(...);`



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityRecoverFromSavepointITCase.java:
##########
@@ -67,40 +68,47 @@
  * Tests for recovering from savepoint when Kubernetes HA is enabled. The 
savepoint will be
  * persisted as a checkpoint and stored in the ConfigMap when recovered 
successfully.
  */
-public class KubernetesHighAvailabilityRecoverFromSavepointITCase extends 
TestLogger {
+public class KubernetesHighAvailabilityRecoverFromSavepointITCase {
 
     private static final long TIMEOUT = 60 * 1000;
 
     private static final String CLUSTER_ID = "flink-on-k8s-cluster-" + 
System.currentTimeMillis();
 
     private static final String FLAT_MAP_UID = "my-flat-map";
 
-    @ClassRule public static KubernetesResource kubernetesResource = new 
KubernetesResource();
+    private static Path temporaryPath;
 
-    @ClassRule public static TemporaryFolder temporaryFolder = new 
TemporaryFolder();
-
-    @Rule
-    public MiniClusterWithClientResource miniClusterResource =
-            new MiniClusterWithClientResource(
+    @RegisterExtension
+    @Order(1)

Review Comment:
   why is the order relevant?



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientITCase.java:
##########
@@ -69,9 +67,9 @@ public class Fabric8FlinkKubeClientITCase extends TestLogger {
 
     private ExecutorService executorService;
 
-    @Before
-    public void setup() throws Exception {
-        flinkKubeClient = kubernetesResource.getFlinkKubeClient();
+    @BeforeEach
+    private void setup() throws Exception {

Review Comment:
   ```suggestion
       void setup() throws Exception {
   ```



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java:
##########
@@ -77,20 +75,19 @@ public void testDynamicProperties() throws Exception {
         final Configuration executorConfig = 
cli.getEffectiveConfiguration(args);
         final ClusterClientFactory<String> clientFactory = 
getClusterClientFactory(executorConfig);
 
-        Assert.assertNotNull(clientFactory);
+        assertThat(clientFactory).isNotNull();
 
         final Map<String, String> executorConfigMap = executorConfig.toMap();
-        assertEquals(4, executorConfigMap.size());
-        assertEquals("5 min", executorConfigMap.get("akka.ask.timeout"));
-        assertEquals("-DappName=foobar", 
executorConfigMap.get("env.java.opts"));
-        assertEquals(
-                tmp.getRoot().getAbsolutePath(),
-                executorConfig.get(DeploymentOptionsInternal.CONF_DIR));
-        
assertTrue(executorConfigMap.containsKey(DeploymentOptions.TARGET.key()));
+        assertThat(executorConfigMap).hasSize(4);
+        assertThat(executorConfigMap.get("akka.ask.timeout")).isEqualTo("5 
min");
+        
assertThat(executorConfigMap.get("env.java.opts")).isEqualTo("-DappName=foobar");
+        assertThat(confDirPath.toAbsolutePath().toString())
+                
.isEqualTo(executorConfig.get(DeploymentOptionsInternal.CONF_DIR));

Review Comment:
   invert assertion



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java:
##########
@@ -575,38 +594,40 @@ public void 
testReplaceFailedWithPossiblyInconsistentState() throws Exception {
                                                     LOCK_IDENTITY);
 
                             final StringResourceVersion resourceVersion = 
anotherStore.exists(key);
-                            assertThat(resourceVersion.isExisting(), is(true));
-                            try {
-                                anotherStore.replace(
-                                        key,
-                                        resourceVersion,
-                                        new 
TestingLongStateHandleHelper.LongStateHandle(23456L));
-                                fail(
-                                        "An exception having a 
PossibleInconsistentStateException as its cause should have been thrown.");
-                            } catch (Exception ex) {
-                                assertThat(ex, is(updateException));
-                            }
-                            assertThat(anotherStore.getAllAndLock().size(), 
is(1));
+                            assertThat(resourceVersion.isExisting()).isTrue();
+                            assertThatThrownBy(
+                                            () ->
+                                                    anotherStore.replace(
+                                                            key,
+                                                            resourceVersion,
+                                                            new 
TestingLongStateHandleHelper
+                                                                    
.LongStateHandle(23456L)),
+                                            "An exception having a 
PossibleInconsistentStateException as its cause should have been thrown.")
+                                    .satisfies(
+                                            cause -> 
assertThat(cause).isEqualTo(updateException));

Review Comment:
   ```suggestion
                                       .isEqualTo(updateException);
   ```



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -361,138 +355,121 @@ private void 
testNodePortService(KubernetesConfigOptions.NodePortAddressType add
                     throw new IllegalArgumentException(
                             String.format("Unexpected address type %s.", 
addressType));
             }
-            assertThat(resultEndpoint.get().getAddress(), isIn(expectedIps));
-            assertThat(resultEndpoint.get().getPort(), is(NODE_PORT));
+            assertThat(resultEndpoint.get().getAddress()).isIn(expectedIps);
+            assertThat(resultEndpoint.get().getPort()).isEqualTo(NODE_PORT);
         }
     }
 
     @Test
-    public void testNodePortServiceWithNoMatchingIP() {
+    void testNodePortServiceWithNoMatchingIP() {
         mockExpectedServiceFromServerSide(buildExternalServiceWithNodePort());
-        assertFalse(flinkKubeClient.getRestEndpoint(CLUSTER_ID).isPresent());
+        
assertThat(flinkKubeClient.getRestEndpoint(CLUSTER_ID).isPresent()).isFalse();
     }
 
     @Test
-    public void testClusterIPService() {
+    void testClusterIPService() {
         mockExpectedServiceFromServerSide(buildExternalServiceWithClusterIP());
 
         final Optional<Endpoint> resultEndpoint = 
flinkKubeClient.getRestEndpoint(CLUSTER_ID);
-        assertThat(resultEndpoint.isPresent(), is(true));
-        assertThat(
-                resultEndpoint.get().getAddress(),
-                is(
+        assertThat(resultEndpoint).isPresent();
+        assertThat(resultEndpoint.get().getAddress())
+                .isEqualTo(
                         
ExternalServiceDecorator.getNamespacedExternalServiceName(
-                                CLUSTER_ID, NAMESPACE)));
-        assertThat(resultEndpoint.get().getPort(), is(REST_PORT));
+                                CLUSTER_ID, NAMESPACE));
+        assertThat(resultEndpoint.get().getPort()).isEqualTo(REST_PORT);
     }
 
     @Test
-    public void testStopAndCleanupCluster() throws Exception {
+    void testStopAndCleanupCluster() throws Exception {
         
this.flinkKubeClient.createJobManagerComponent(this.kubernetesJobManagerSpecification);
 
         final KubernetesPod kubernetesPod = 
buildKubernetesPod(TASKMANAGER_POD_NAME);
         this.flinkKubeClient.createTaskManagerPod(kubernetesPod).get();
 
-        assertEquals(
-                1,
-                this.kubeClient
-                        .apps()
-                        .deployments()
-                        .inNamespace(NAMESPACE)
-                        .list()
-                        .getItems()
-                        .size());
-        assertEquals(
-                1, 
this.kubeClient.configMaps().inNamespace(NAMESPACE).list().getItems().size());
-        assertEquals(2, 
this.kubeClient.services().inNamespace(NAMESPACE).list().getItems().size());
-        assertEquals(1, 
this.kubeClient.pods().inNamespace(NAMESPACE).list().getItems().size());
+        assertThat(
+                        this.kubeClient
+                                .apps()
+                                .deployments()
+                                .inNamespace(NAMESPACE)
+                                .list()
+                                .getItems()
+                                .size())
+                .isEqualTo(1);
+        
assertThat(this.kubeClient.configMaps().inNamespace(NAMESPACE).list().getItems().size())
+                .isEqualTo(1);
+        
assertThat(this.kubeClient.services().inNamespace(NAMESPACE).list().getItems()).hasSize(2);
+        
assertThat(this.kubeClient.pods().inNamespace(NAMESPACE).list().getItems()).hasSize(1);
 
         this.flinkKubeClient.stopAndCleanupCluster(CLUSTER_ID);
-        assertTrue(
-                this.kubeClient
-                        .apps()
-                        .deployments()
-                        .inNamespace(NAMESPACE)
-                        .list()
-                        .getItems()
-                        .isEmpty());
+        
assertThat(this.kubeClient.apps().deployments().inNamespace(NAMESPACE).list().getItems())
+                .isEmpty();
     }
 
     @Test
-    public void testCreateConfigMap() throws Exception {
+    void testCreateConfigMap() throws Exception {
         final KubernetesConfigMap configMap = buildTestingConfigMap();
         this.flinkKubeClient.createConfigMap(configMap).get();
         final Optional<KubernetesConfigMap> currentOpt =
                 this.flinkKubeClient.getConfigMap(TESTING_CONFIG_MAP_NAME);
-        assertThat(currentOpt.isPresent(), is(true));
-        assertThat(
-                currentOpt.get().getData().get(TESTING_CONFIG_MAP_KEY),
-                is(TESTING_CONFIG_MAP_VALUE));
+        assertThat(currentOpt).isPresent();
+        assertThat(currentOpt.get().getData())
+                .containsEntry(TESTING_CONFIG_MAP_KEY, 
TESTING_CONFIG_MAP_VALUE);

Review Comment:
   could be combined to `assertThat(currentOpt).hasValueSatisfying(...)`



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/FlinkPodTest.java:
##########
@@ -23,32 +23,29 @@
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link FlinkPod}. */
-public class FlinkPodTest extends KubernetesTestBase {
+class FlinkPodTest extends KubernetesTestBase {
 
     @Test
-    public void testCopyFlinkPod() {
+    void testCopyFlinkPod() {
         final FlinkPod flinkPod =
                 KubernetesUtils.loadPodFromTemplateFile(
                         flinkKubeClient,
                         KubernetesPodTemplateTestUtils.getPodTemplateFile(),
                         
KubernetesPodTemplateTestUtils.TESTING_MAIN_CONTAINER_NAME);
         final FlinkPod copiedFlinkPod = flinkPod.copy();
-        assertThat(flinkPod == copiedFlinkPod, is(false));
+        assertThat(flinkPod == copiedFlinkPod).isFalse();
         assertThat(
-                flinkPod.getPodWithoutMainContainer()
-                        == copiedFlinkPod.getPodWithoutMainContainer(),
-                is(false));
-        assertThat(
-                flinkPod.getPodWithoutMainContainer(),
-                is(equalTo(copiedFlinkPod.getPodWithoutMainContainer())));
-        assertThat(flinkPod.getMainContainer() == 
copiedFlinkPod.getMainContainer(), is(false));
-        assertThat(flinkPod.getMainContainer(), 
is(equalTo(copiedFlinkPod.getMainContainer())));
+                        flinkPod.getPodWithoutMainContainer()

Review Comment:
   isNotEqualTo



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/FlinkPodTest.java:
##########
@@ -23,32 +23,29 @@
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link FlinkPod}. */
-public class FlinkPodTest extends KubernetesTestBase {
+class FlinkPodTest extends KubernetesTestBase {
 
     @Test
-    public void testCopyFlinkPod() {
+    void testCopyFlinkPod() {
         final FlinkPod flinkPod =
                 KubernetesUtils.loadPodFromTemplateFile(
                         flinkKubeClient,
                         KubernetesPodTemplateTestUtils.getPodTemplateFile(),
                         
KubernetesPodTemplateTestUtils.TESTING_MAIN_CONTAINER_NAME);
         final FlinkPod copiedFlinkPod = flinkPod.copy();
-        assertThat(flinkPod == copiedFlinkPod, is(false));
+        assertThat(flinkPod == copiedFlinkPod).isFalse();
         assertThat(
-                flinkPod.getPodWithoutMainContainer()
-                        == copiedFlinkPod.getPodWithoutMainContainer(),
-                is(false));
-        assertThat(
-                flinkPod.getPodWithoutMainContainer(),
-                is(equalTo(copiedFlinkPod.getPodWithoutMainContainer())));
-        assertThat(flinkPod.getMainContainer() == 
copiedFlinkPod.getMainContainer(), is(false));
-        assertThat(flinkPod.getMainContainer(), 
is(equalTo(copiedFlinkPod.getMainContainer())));
+                        flinkPod.getPodWithoutMainContainer()
+                                == copiedFlinkPod.getPodWithoutMainContainer())
+                .isFalse();
+        assertThat(flinkPod.getPodWithoutMainContainer())
+                .isEqualTo(copiedFlinkPod.getPodWithoutMainContainer());
+        assertThat(flinkPod.getMainContainer() == 
copiedFlinkPod.getMainContainer()).isFalse();

Review Comment:
   isNotEqualTo



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/FlinkPodTest.java:
##########
@@ -23,32 +23,29 @@
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link FlinkPod}. */
-public class FlinkPodTest extends KubernetesTestBase {
+class FlinkPodTest extends KubernetesTestBase {
 
     @Test
-    public void testCopyFlinkPod() {
+    void testCopyFlinkPod() {
         final FlinkPod flinkPod =
                 KubernetesUtils.loadPodFromTemplateFile(
                         flinkKubeClient,
                         KubernetesPodTemplateTestUtils.getPodTemplateFile(),
                         
KubernetesPodTemplateTestUtils.TESTING_MAIN_CONTAINER_NAME);
         final FlinkPod copiedFlinkPod = flinkPod.copy();
-        assertThat(flinkPod == copiedFlinkPod, is(false));
+        assertThat(flinkPod == copiedFlinkPod).isFalse();

Review Comment:
   isNotEqualTo



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcherTest.java:
##########
@@ -75,25 +71,23 @@ public void testCallbackHandler() {
         podsWatcher.eventReceived(Watcher.Action.DELETED, 
pod.getPodWithoutMainContainer());
         podsWatcher.eventReceived(Watcher.Action.ERROR, 
pod.getPodWithoutMainContainer());
 
-        assertThat(podAddedList.size(), is(1));
-        assertThat(podModifiedList.size(), is(1));
-        assertThat(podDeletedList.size(), is(1));
-        assertThat(podErrorList.size(), is(1));
+        assertThat(podAddedList).hasSize(1);
+        assertThat(podModifiedList).hasSize(1);
+        assertThat(podDeletedList).hasSize(1);
+        assertThat(podErrorList).hasSize(1);
     }
 
     @Test
-    public void testClosingWithTooOldResourceVersion() {
+    void testClosingWithTooOldResourceVersion() {
         final String errMsg = "too old resource version";
         final KubernetesPodsWatcher podsWatcher =
                 new KubernetesPodsWatcher(
                         new TestingCallbackHandler(
                                 e -> {
-                                    assertThat(
-                                            e,
-                                            Matchers.instanceOf(
-                                                    
KubernetesTooOldResourceVersionException
-                                                            .class));
-                                    assertThat(e, 
FlinkMatchers.containsMessage(errMsg));
+                                    assertThat(e)
+                                            .isInstanceOf(
+                                                    
KubernetesTooOldResourceVersionException.class);
+                                    assertThat(e).hasMessageContaining(errMsg);

Review Comment:
   ```suggestion
                                       assertThat(e)
                                               .isInstanceOf(
                                                       
KubernetesTooOldResourceVersionException.class)
                                               .hasMessageContaining(errMsg);
   ```



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java:
##########
@@ -76,16 +72,16 @@ public void testGetEnvironments() {
         final Map<String, String> resultEnvironments =
                 kubernetesJobManagerParameters.getEnvironments();
 
-        assertEquals(expectedEnvironments, resultEnvironments);
+        assertThat(resultEnvironments).isEqualTo(expectedEnvironments);
     }
 
     @Test
-    public void testGetEmptyAnnotations() {
-        assertTrue(kubernetesJobManagerParameters.getAnnotations().isEmpty());
+    void testGetEmptyAnnotations() {
+        
assertThat(kubernetesJobManagerParameters.getAnnotations().isEmpty()).isTrue();

Review Comment:
   ```suggestion
           
assertThat(kubernetesJobManagerParameters.getAnnotations()).isEmpty();
   ```



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -571,35 +543,34 @@ public void 
testCheckAndUpdateConfigMapWhenGetConfigMapFailed() throws Exception
         mockGetConfigMapFailed(configMap.getInternalResource());
 
         final int initialRequestCount = server.getRequestCount();
-        try {
-            this.flinkKubeClient
-                    .checkAndUpdateConfigMap(
-                            TESTING_CONFIG_MAP_NAME,
-                            c -> {
-                                throw new AssertionError(
-                                        "The replace operation should have 
never been triggered.");
-                            })
-                    .get();
-            fail(
-                    "checkAndUpdateConfigMap should fail without a 
PossibleInconsistentStateException being the cause when number of retries has 
been exhausted.");
-        } catch (Exception ex) {
-            assertThat(
-                    ex,
-                    FlinkMatchers.containsMessage(
-                            "Could not complete the "
-                                    + "operation. Number of retries has been 
exhausted."));
-            final int actualRetryCount = server.getRequestCount() - 
initialRequestCount;
-            assertThat(actualRetryCount, is(configuredRetries + 1));
-            assertThat(
-                    "An error while retrieving the ConfigMap should not cause 
a PossibleInconsistentStateException.",
-                    ExceptionUtils.findThrowable(ex, 
PossibleInconsistentStateException.class)
-                            .isPresent(),
-                    is(false));
-        }
+        assertThatThrownBy(
+                        () ->
+                                this.flinkKubeClient
+                                        .checkAndUpdateConfigMap(
+                                                TESTING_CONFIG_MAP_NAME,
+                                                c -> {
+                                                    throw new AssertionError(
+                                                            "The replace 
operation should have never been triggered.");
+                                                })
+                                        .get(),
+                        "checkAndUpdateConfigMap should fail without a 
PossibleInconsistentStateException being the cause when number of retries has 
been exhausted.")
+                .satisfies(
+                        anyCauseMatches(
+                                "Could not complete the "
+                                        + "operation. Number of retries has 
been exhausted."))

Review Comment:
   ```suggestion
                                   "Could not complete the operation. Number of 
retries has been exhausted."))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to