This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 64c725f4a6e [FLINK-32837][JUnit5 Migration] Migrate the client and
clusterframework packages of flink-runtime module to junit5 (#23241)
64c725f4a6e is described below
commit 64c725f4a6ed0073a0559f4b2eadaf909608abf2
Author: Rui Fan <[email protected]>
AuthorDate: Wed Aug 23 21:51:47 2023 +0800
[FLINK-32837][JUnit5 Migration] Migrate the client and clusterframework
packages of flink-runtime module to junit5 (#23241)
---
.../flink/runtime/client/ClientUtilsTest.java | 100 ++-
.../client/SerializedJobExecutionResultTest.java | 74 +-
.../clusterframework/ApplicationStatusTest.java | 27 +-
.../clusterframework/BootstrapToolsTest.java | 844 ++++++++++-----------
.../TaskExecutorProcessUtilsTest.java | 357 ++++-----
.../types/ResourceBudgetManagerTest.java | 34 +-
.../types/ResourceProfileTest.java | 235 +++---
.../jobmanager/JobManagerProcessUtilsTest.java | 173 ++---
8 files changed, 890 insertions(+), 954 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
index 78a25aff5f1..6d600a112a3 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
@@ -28,47 +28,49 @@ import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.TestLogger;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link ClientUtils}. */
-public class ClientUtilsTest extends TestLogger {
+public class ClientUtilsTest {
- @ClassRule public static TemporaryFolder temporaryFolder = new
TemporaryFolder();
+ @TempDir private static java.nio.file.Path temporaryFolder;
private static BlobServer blobServer = null;
- @BeforeClass
- public static void setup() throws IOException {
+ @BeforeAll
+ static void setup() throws IOException {
Configuration config = new Configuration();
- blobServer = new BlobServer(config, temporaryFolder.newFolder(), new
VoidBlobStore());
+ blobServer =
+ new BlobServer(
+ config, TempDirUtils.newFolder(temporaryFolder), new
VoidBlobStore());
blobServer.start();
}
- @AfterClass
- public static void teardown() throws IOException {
+ @AfterAll
+ static void teardown() throws IOException {
if (blobServer != null) {
blobServer.close();
}
}
@Test
- public void uploadAndSetUserJars() throws Exception {
- java.nio.file.Path tmpDir = temporaryFolder.newFolder().toPath();
+ void uploadAndSetUserJars() throws Exception {
+ java.nio.file.Path tmpDir =
TempDirUtils.newFolder(temporaryFolder).toPath();
JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph();
Collection<Path> jars =
@@ -78,8 +80,8 @@ public class ClientUtilsTest extends TestLogger {
jars.forEach(jobGraph::addJar);
- assertEquals(jars.size(), jobGraph.getUserJars().size());
- assertEquals(0, jobGraph.getUserJarBlobKeys().size());
+ assertThat(jobGraph.getUserJars()).hasSameSizeAs(jars);
+ assertThat(jobGraph.getUserJarBlobKeys()).isEmpty();
ClientUtils.extractAndUploadJobGraphFiles(
jobGraph,
@@ -88,9 +90,9 @@ public class ClientUtilsTest extends TestLogger {
new InetSocketAddress("localhost",
blobServer.getPort()),
new Configuration()));
- assertEquals(jars.size(), jobGraph.getUserJars().size());
- assertEquals(jars.size(), jobGraph.getUserJarBlobKeys().size());
- assertEquals(jars.size(),
jobGraph.getUserJarBlobKeys().stream().distinct().count());
+ assertThat(jobGraph.getUserJars()).hasSameSizeAs(jars);
+ assertThat(jobGraph.getUserJarBlobKeys()).hasSameSizeAs(jars);
+
assertThat(jobGraph.getUserJarBlobKeys().stream().distinct()).hasSameSizeAs(jars);
for (PermanentBlobKey blobKey : jobGraph.getUserJarBlobKeys()) {
blobServer.getFile(jobGraph.getJobID(), blobKey);
@@ -98,8 +100,8 @@ public class ClientUtilsTest extends TestLogger {
}
@Test
- public void uploadAndSetUserArtifacts() throws Exception {
- java.nio.file.Path tmpDir = temporaryFolder.newFolder().toPath();
+ void uploadAndSetUserArtifacts() throws Exception {
+ java.nio.file.Path tmpDir =
TempDirUtils.newFolder(temporaryFolder).toPath();
JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph();
Collection<DistributedCache.DistributedCacheEntry> localArtifacts =
@@ -114,7 +116,7 @@ public class ClientUtilsTest extends TestLogger {
Files.createFile(tmpDir.resolve("art4")).toString(), true, false));
Collection<DistributedCache.DistributedCacheEntry>
distributedArtifacts =
- Arrays.asList(
+ Collections.singletonList(
new DistributedCache.DistributedCacheEntry(
"hdfs://localhost:1234/test", true, false));
@@ -127,12 +129,11 @@ public class ClientUtilsTest extends TestLogger {
final int totalNumArtifacts = localArtifacts.size() +
distributedArtifacts.size();
- assertEquals(totalNumArtifacts, jobGraph.getUserArtifacts().size());
- assertEquals(
- 0,
- jobGraph.getUserArtifacts().values().stream()
- .filter(entry -> entry.blobKey != null)
- .count());
+ assertThat(jobGraph.getUserArtifacts()).hasSize(totalNumArtifacts);
+ assertThat(
+ jobGraph.getUserArtifacts().values().stream()
+ .filter(entry -> entry.blobKey != null))
+ .isEmpty();
ClientUtils.extractAndUploadJobGraphFiles(
jobGraph,
@@ -141,24 +142,21 @@ public class ClientUtilsTest extends TestLogger {
new InetSocketAddress("localhost",
blobServer.getPort()),
new Configuration()));
- assertEquals(totalNumArtifacts, jobGraph.getUserArtifacts().size());
- assertEquals(
- localArtifacts.size(),
- jobGraph.getUserArtifacts().values().stream()
- .filter(entry -> entry.blobKey != null)
- .count());
- assertEquals(
- distributedArtifacts.size(),
- jobGraph.getUserArtifacts().values().stream()
- .filter(entry -> entry.blobKey == null)
- .count());
+ assertThat(jobGraph.getUserArtifacts()).hasSize(totalNumArtifacts);
+ assertThat(
+ jobGraph.getUserArtifacts().values().stream()
+ .filter(entry -> entry.blobKey != null))
+ .hasSameSizeAs(localArtifacts);
+ assertThat(
+ jobGraph.getUserArtifacts().values().stream()
+ .filter(entry -> entry.blobKey == null))
+ .hasSameSizeAs(distributedArtifacts);
// 1 unique key for each local artifact, and null for distributed
artifacts
- assertEquals(
- localArtifacts.size() + 1,
- jobGraph.getUserArtifacts().values().stream()
- .map(entry -> entry.blobKey)
- .distinct()
- .count());
+ assertThat(
+ jobGraph.getUserArtifacts().values().stream()
+ .map(entry -> entry.blobKey)
+ .distinct())
+ .hasSize(localArtifacts.size() + 1);
for (DistributedCache.DistributedCacheEntry original : localArtifacts)
{
assertState(
original,
@@ -181,10 +179,10 @@ public class ClientUtilsTest extends TestLogger {
boolean isBlobKeyNull,
JobID jobId)
throws Exception {
- assertEquals(original.isZipped, actual.isZipped);
- assertEquals(original.isExecutable, actual.isExecutable);
- assertEquals(original.filePath, actual.filePath);
- assertEquals(isBlobKeyNull, actual.blobKey == null);
+ assertThat(actual.isZipped).isEqualTo(original.isZipped);
+ assertThat(actual.isExecutable).isEqualTo(original.isExecutable);
+ assertThat(actual.filePath).isEqualTo(original.filePath);
+ assertThat(actual.blobKey == null).isEqualTo(isBlobKeyNull);
if (!isBlobKeyNull) {
blobServer.getFile(
jobId,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java
index 83101dba951..4188b19e399 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java
@@ -22,28 +22,24 @@ import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
-import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.SerializedValue;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for the SerializedJobExecutionResult */
-public class SerializedJobExecutionResultTest extends TestLogger {
+class SerializedJobExecutionResultTest {
@Test
- public void testSerialization() throws Exception {
+ void testSerialization() throws Exception {
final ClassLoader classloader = getClass().getClassLoader();
JobID origJobId = new JobID();
@@ -62,61 +58,53 @@ public class SerializedJobExecutionResultTest extends
TestLogger {
// serialize and deserialize the object
SerializedJobExecutionResult cloned =
CommonTestUtils.createCopySerializable(result);
- assertEquals(origJobId, cloned.getJobId());
- assertEquals(origTime, cloned.getNetRuntime());
- assertEquals(origTime, cloned.getNetRuntime(TimeUnit.MILLISECONDS));
- assertEquals(origMap, cloned.getSerializedAccumulatorResults());
+ assertThat(cloned.getJobId()).isEqualTo(origJobId);
+ assertThat(cloned.getNetRuntime()).isEqualTo(origTime);
+
assertThat(cloned.getNetRuntime(TimeUnit.MILLISECONDS)).isEqualTo(origTime);
+
assertThat(cloned.getSerializedAccumulatorResults()).isEqualTo(origMap);
// convert to deserialized result
JobExecutionResult jResult = result.toJobExecutionResult(classloader);
JobExecutionResult jResultCopied =
result.toJobExecutionResult(classloader);
- assertEquals(origJobId, jResult.getJobID());
- assertEquals(origJobId, jResultCopied.getJobID());
- assertEquals(origTime, jResult.getNetRuntime());
- assertEquals(origTime, jResult.getNetRuntime(TimeUnit.MILLISECONDS));
- assertEquals(origTime, jResultCopied.getNetRuntime());
- assertEquals(origTime,
jResultCopied.getNetRuntime(TimeUnit.MILLISECONDS));
+ assertThat(jResult.getJobID()).isEqualTo(origJobId);
+ assertThat(jResultCopied.getJobID()).isEqualTo(origJobId);
+ assertThat(jResult.getNetRuntime()).isEqualTo(origTime);
+
assertThat(jResult.getNetRuntime(TimeUnit.MILLISECONDS)).isEqualTo(origTime);
+ assertThat(jResultCopied.getNetRuntime()).isEqualTo(origTime);
+
assertThat(jResultCopied.getNetRuntime(TimeUnit.MILLISECONDS)).isEqualTo(origTime);
for (Map.Entry<String, SerializedValue<OptionalFailure<Object>>> entry
:
origMap.entrySet()) {
String name = entry.getKey();
OptionalFailure<Object> value =
entry.getValue().deserializeValue(classloader);
if (value.isFailure()) {
- try {
- jResult.getAccumulatorResult(name);
- fail("expected failure");
- } catch (FlinkRuntimeException ex) {
- assertTrue(
- ExceptionUtils.findThrowable(ex,
ExpectedTestException.class)
- .isPresent());
- }
- try {
- jResultCopied.getAccumulatorResult(name);
- fail("expected failure");
- } catch (FlinkRuntimeException ex) {
- assertTrue(
- ExceptionUtils.findThrowable(ex,
ExpectedTestException.class)
- .isPresent());
- }
+ assertThatThrownBy(() -> jResult.getAccumulatorResult(name))
+ .isInstanceOf(FlinkRuntimeException.class)
+ .hasCauseInstanceOf(ExpectedTestException.class);
+
+ assertThatThrownBy(() ->
jResultCopied.getAccumulatorResult(name))
+ .isInstanceOf(FlinkRuntimeException.class)
+ .hasCauseInstanceOf(ExpectedTestException.class);
} else {
- assertEquals(value.get(), jResult.getAccumulatorResult(name));
- assertEquals(value.get(),
jResultCopied.getAccumulatorResult(name));
+ assertThat((Object)
jResult.getAccumulatorResult(name)).isEqualTo(value.get());
+ assertThat((Object) jResultCopied.getAccumulatorResult(name))
+ .isEqualTo(value.get());
}
}
}
@Test
- public void testSerializationWithNullValues() throws Exception {
+ void testSerializationWithNullValues() throws Exception {
SerializedJobExecutionResult result = new
SerializedJobExecutionResult(null, 0L, null);
SerializedJobExecutionResult cloned =
CommonTestUtils.createCopySerializable(result);
- assertNull(cloned.getJobId());
- assertEquals(0L, cloned.getNetRuntime());
- assertNull(cloned.getSerializedAccumulatorResults());
+ assertThat(cloned.getJobId()).isNull();
+ assertThat(cloned.getNetRuntime()).isZero();
+ assertThat(cloned.getSerializedAccumulatorResults()).isNull();
JobExecutionResult jResult =
result.toJobExecutionResult(getClass().getClassLoader());
- assertNull(jResult.getJobID());
- assertTrue(jResult.getAllAccumulatorResults().isEmpty());
+ assertThat(jResult.getJobID()).isNull();
+ assertThat(jResult.getAllAccumulatorResults()).isEmpty();
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ApplicationStatusTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ApplicationStatusTest.java
index 6cc225d0001..507c0a4fe26 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ApplicationStatusTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ApplicationStatusTest.java
@@ -19,10 +19,8 @@
package org.apache.flink.runtime.clusterframework;
import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.util.TestLoggerExtension;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
@@ -34,64 +32,63 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for the {@link ApplicationStatus}. */
-@ExtendWith(TestLoggerExtension.class)
-public class ApplicationStatusTest {
+class ApplicationStatusTest {
private static final int SUCCESS_EXIT_CODE = 0;
@Test
- public void succeededStatusMapsToSuccessExitCode() {
+ void succeededStatusMapsToSuccessExitCode() {
int exitCode = ApplicationStatus.SUCCEEDED.processExitCode();
assertThat(exitCode).isEqualTo(SUCCESS_EXIT_CODE);
}
@Test
- public void cancelledStatusMapsToSuccessExitCode() {
+ void cancelledStatusMapsToSuccessExitCode() {
int exitCode = ApplicationStatus.CANCELED.processExitCode();
assertThat(exitCode).isEqualTo(SUCCESS_EXIT_CODE);
}
@Test
- public void notSucceededNorCancelledStatusMapsToNonSuccessExitCode() {
+ void notSucceededNorCancelledStatusMapsToNonSuccessExitCode() {
Iterable<Integer> exitCodes =
exitCodes(notSucceededNorCancelledStatus());
assertThat(exitCodes).doesNotContain(SUCCESS_EXIT_CODE);
}
@Test
- public void testJobStatusFromSuccessApplicationStatus() {
+ void testJobStatusFromSuccessApplicationStatus() {
assertThat(ApplicationStatus.SUCCEEDED.deriveJobStatus()).isEqualTo(JobStatus.FINISHED);
}
@Test
- public void testJobStatusFromFailedApplicationStatus() {
+ void testJobStatusFromFailedApplicationStatus() {
assertThat(ApplicationStatus.FAILED.deriveJobStatus()).isEqualTo(JobStatus.FAILED);
}
@Test
- public void testJobStatusFromCancelledApplicationStatus() {
+ void testJobStatusFromCancelledApplicationStatus() {
assertThat(ApplicationStatus.CANCELED.deriveJobStatus()).isEqualTo(JobStatus.CANCELED);
}
@Test
- public void testJobStatusFailsFromUnknownApplicationStatuses() {
+ void testJobStatusFailsFromUnknownApplicationStatuses() {
assertThatThrownBy(ApplicationStatus.UNKNOWN::deriveJobStatus)
.isInstanceOf(UnsupportedOperationException.class);
}
@Test
- public void testSuccessApplicationStatusFromJobStatus() {
+ void testSuccessApplicationStatusFromJobStatus() {
assertThat(ApplicationStatus.fromJobStatus(JobStatus.FINISHED))
.isEqualTo(ApplicationStatus.SUCCEEDED);
}
@Test
- public void testFailedApplicationStatusFromJobStatus() {
+ void testFailedApplicationStatusFromJobStatus() {
assertThat(ApplicationStatus.fromJobStatus(JobStatus.FAILED))
.isEqualTo(ApplicationStatus.FAILED);
}
@Test
- public void testCancelledApplicationStatusFromJobStatus() {
+ void testCancelledApplicationStatusFromJobStatus() {
assertThat(ApplicationStatus.fromJobStatus(JobStatus.CANCELED))
.isEqualTo(ApplicationStatus.CANCELED);
}
@@ -114,7 +111,7 @@ public class ApplicationStatusTest {
}
@Test
- public void testUnknownApplicationStatusForMissingJobStatus() {
+ void testUnknownApplicationStatusForMissingJobStatus() {
assertThat(ApplicationStatus.fromJobStatus(null)).isEqualTo(ApplicationStatus.UNKNOWN);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
index 2012494d5bb..2315595e7c5 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
@@ -27,15 +27,11 @@ import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.OperatingSystem;
-import org.apache.flink.util.TestLogger;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.IOException;
@@ -47,21 +43,15 @@ import java.util.List;
import java.util.Map;
import static
org.apache.flink.configuration.GlobalConfiguration.FLINK_CONF_FILENAME;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link BootstrapTools}. */
-public class BootstrapToolsTest extends TestLogger {
+class BootstrapToolsTest {
- private static final Logger LOG =
LoggerFactory.getLogger(BootstrapToolsTest.class);
-
- @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+ @TempDir private static java.nio.file.Path temporaryFolder;
@Test
- public void testSubstituteConfigKey() {
+ void testSubstituteConfigKey() {
String deprecatedKey1 = "deprecated-key";
String deprecatedKey2 = "another-out_of-date_key";
String deprecatedKey3 = "yet-one-more";
@@ -85,18 +75,18 @@ public class BootstrapToolsTest extends TestLogger {
BootstrapTools.substituteDeprecatedConfigKey(cfg, deprecatedKey3,
designatedKey3);
// value 1 should be set to designated
- assertEquals(value1, cfg.getString(designatedKey1, null));
+ assertThat(cfg.getString(designatedKey1, null)).isEqualTo(value1);
// value 2 should not have been set, since it had a value already
- assertEquals(value2Designated, cfg.getString(designatedKey2, null));
+ assertThat(cfg.getString(designatedKey2,
null)).isEqualTo(value2Designated);
// nothing should be in there for key 3
- assertNull(cfg.getString(designatedKey3, null));
- assertNull(cfg.getString(deprecatedKey3, null));
+ assertThat(cfg.getString(designatedKey3, null)).isNull();
+ assertThat(cfg.getString(deprecatedKey3, null)).isNull();
}
@Test
- public void testSubstituteConfigKeyPrefix() {
+ void testSubstituteConfigKeyPrefix() {
String deprecatedPrefix1 = "deprecated-prefix";
String deprecatedPrefix2 = "-prefix-2";
String deprecatedPrefix3 = "prefix-3";
@@ -129,19 +119,19 @@ public class BootstrapToolsTest extends TestLogger {
BootstrapTools.substituteDeprecatedConfigPrefix(cfg,
deprecatedPrefix2, designatedPrefix2);
BootstrapTools.substituteDeprecatedConfigPrefix(cfg,
deprecatedPrefix3, designatedPrefix3);
- assertEquals(val1, cfg.getString(desig1, null));
- assertEquals(val2, cfg.getString(desig2, null));
- assertEquals(val3Desig, cfg.getString(desig3, null));
+ assertThat(cfg.getString(desig1, null)).isEqualTo(val1);
+ assertThat(cfg.getString(desig2, null)).isEqualTo(val2);
+ assertThat(cfg.getString(desig3, null)).isEqualTo(val3Desig);
// check that nothing with prefix 3 is contained
for (String key : cfg.keySet()) {
- assertFalse(key.startsWith(designatedPrefix3));
- assertFalse(key.startsWith(deprecatedPrefix3));
+ assertThat(key.startsWith(designatedPrefix3)).isFalse();
+ assertThat(key.startsWith(deprecatedPrefix3)).isFalse();
}
}
@Test
- public void testGetTaskManagerShellCommand() {
+ void testGetTaskManagerShellCommand() {
final Configuration cfg = new Configuration();
final TaskExecutorProcessSpec taskExecutorProcessSpec =
new TaskExecutorProcessSpec(
@@ -156,8 +146,7 @@ public class BootstrapToolsTest extends TestLogger {
new MemorySize(0), // jvmOverheadSize
Collections.emptyList());
final ContaineredTaskManagerParameters containeredParams =
- new ContaineredTaskManagerParameters(
- taskExecutorProcessSpec, new HashMap<String,
String>());
+ new ContaineredTaskManagerParameters(taskExecutorProcessSpec,
new HashMap<>());
// no logging, with/out krb5
final String java = "$JAVA_HOME/bin/java";
@@ -178,407 +167,423 @@ public class BootstrapToolsTest extends TestLogger {
final String args = dynamicConfigs + " " + basicArgs + " " + mainArgs;
final String redirects = "1> ./logs/taskmanager.out 2>
./logs/taskmanager.err";
- assertEquals(
- String.join(
- " ",
- java,
- jvmmem,
- BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
- mainClass,
- dynamicConfigs,
- basicArgs,
- redirects),
- BootstrapTools.getTaskManagerShellCommand(
- cfg,
- containeredParams,
- "./conf",
- "./logs",
- false,
- false,
- false,
- this.getClass(),
- ""));
-
- assertEquals(
- String.join(
- " ",
- java,
- jvmmem,
- BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
- mainClass,
- args,
- redirects),
- BootstrapTools.getTaskManagerShellCommand(
- cfg,
- containeredParams,
- "./conf",
- "./logs",
- false,
- false,
- false,
- this.getClass(),
- mainArgs));
+ assertThat(
+ BootstrapTools.getTaskManagerShellCommand(
+ cfg,
+ containeredParams,
+ "./conf",
+ "./logs",
+ false,
+ false,
+ false,
+ this.getClass(),
+ ""))
+ .isEqualTo(
+ String.join(
+ " ",
+ java,
+ jvmmem,
+ BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+ mainClass,
+ dynamicConfigs,
+ basicArgs,
+ redirects));
+
+ assertThat(
+ BootstrapTools.getTaskManagerShellCommand(
+ cfg,
+ containeredParams,
+ "./conf",
+ "./logs",
+ false,
+ false,
+ false,
+ this.getClass(),
+ mainArgs))
+ .isEqualTo(
+ String.join(
+ " ",
+ java,
+ jvmmem,
+ BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+ mainClass,
+ args,
+ redirects));
final String krb5 = "-Djava.security.krb5.conf=krb5.conf";
- assertEquals(
- String.join(
- " ",
- java,
- jvmmem,
- BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
- krb5,
- mainClass,
- args,
- redirects),
- BootstrapTools.getTaskManagerShellCommand(
- cfg,
- containeredParams,
- "./conf",
- "./logs",
- false,
- false,
- true,
- this.getClass(),
- mainArgs));
+ assertThat(
+ BootstrapTools.getTaskManagerShellCommand(
+ cfg,
+ containeredParams,
+ "./conf",
+ "./logs",
+ false,
+ false,
+ true,
+ this.getClass(),
+ mainArgs))
+ .isEqualTo(
+ String.join(
+ " ",
+ java,
+ jvmmem,
+ BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+ krb5,
+ mainClass,
+ args,
+ redirects));
// logback only, with/out krb5
- assertEquals(
- String.join(
- " ",
- java,
- jvmmem,
- BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
- logfile,
- logback,
- mainClass,
- args,
- redirects),
- BootstrapTools.getTaskManagerShellCommand(
- cfg,
- containeredParams,
- "./conf",
- "./logs",
- true,
- false,
- false,
- this.getClass(),
- mainArgs));
-
- assertEquals(
- String.join(
- " ",
- java,
- jvmmem,
- BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
- krb5,
- logfile,
- logback,
- mainClass,
- args,
- redirects),
- BootstrapTools.getTaskManagerShellCommand(
- cfg,
- containeredParams,
- "./conf",
- "./logs",
- true,
- false,
- true,
- this.getClass(),
- mainArgs));
+ assertThat(
+ BootstrapTools.getTaskManagerShellCommand(
+ cfg,
+ containeredParams,
+ "./conf",
+ "./logs",
+ true,
+ false,
+ false,
+ this.getClass(),
+ mainArgs))
+ .isEqualTo(
+ String.join(
+ " ",
+ java,
+ jvmmem,
+ BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+ logfile,
+ logback,
+ mainClass,
+ args,
+ redirects));
+
+ assertThat(
+ BootstrapTools.getTaskManagerShellCommand(
+ cfg,
+ containeredParams,
+ "./conf",
+ "./logs",
+ true,
+ false,
+ true,
+ this.getClass(),
+ mainArgs))
+ .isEqualTo(
+ String.join(
+ " ",
+ java,
+ jvmmem,
+ BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+ krb5,
+ logfile,
+ logback,
+ mainClass,
+ args,
+ redirects));
// log4j, with/out krb5
- assertEquals(
- String.join(
- " ",
- java,
- jvmmem,
- BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
- logfile,
- log4j,
- mainClass,
- args,
- redirects),
- BootstrapTools.getTaskManagerShellCommand(
- cfg,
- containeredParams,
- "./conf",
- "./logs",
- false,
- true,
- false,
- this.getClass(),
- mainArgs));
-
- assertEquals(
- String.join(
- " ",
- java,
- jvmmem,
- BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
- krb5,
- logfile,
- log4j,
- mainClass,
- args,
- redirects),
- BootstrapTools.getTaskManagerShellCommand(
- cfg,
- containeredParams,
- "./conf",
- "./logs",
- false,
- true,
- true,
- this.getClass(),
- mainArgs));
+ assertThat(
+ BootstrapTools.getTaskManagerShellCommand(
+ cfg,
+ containeredParams,
+ "./conf",
+ "./logs",
+ false,
+ true,
+ false,
+ this.getClass(),
+ mainArgs))
+ .isEqualTo(
+ String.join(
+ " ",
+ java,
+ jvmmem,
+ BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+ logfile,
+ log4j,
+ mainClass,
+ args,
+ redirects));
+
+ assertThat(
+ BootstrapTools.getTaskManagerShellCommand(
+ cfg,
+ containeredParams,
+ "./conf",
+ "./logs",
+ false,
+ true,
+ true,
+ this.getClass(),
+ mainArgs))
+ .isEqualTo(
+ String.join(
+ " ",
+ java,
+ jvmmem,
+ BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+ krb5,
+ logfile,
+ log4j,
+ mainClass,
+ args,
+ redirects));
// logback + log4j, with/out krb5
- assertEquals(
- String.join(
- " ",
- java,
- jvmmem,
- BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
- logfile,
- logback,
- log4j,
- mainClass,
- args,
- redirects),
- BootstrapTools.getTaskManagerShellCommand(
- cfg,
- containeredParams,
- "./conf",
- "./logs",
- true,
- true,
- false,
- this.getClass(),
- mainArgs));
-
- assertEquals(
- String.join(
- " ",
- java,
- jvmmem,
- BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
- krb5,
- logfile,
- logback,
- log4j,
- mainClass,
- args,
- redirects),
- BootstrapTools.getTaskManagerShellCommand(
- cfg,
- containeredParams,
- "./conf",
- "./logs",
- true,
- true,
- true,
- this.getClass(),
- mainArgs));
+ assertThat(
+ BootstrapTools.getTaskManagerShellCommand(
+ cfg,
+ containeredParams,
+ "./conf",
+ "./logs",
+ true,
+ true,
+ false,
+ this.getClass(),
+ mainArgs))
+ .isEqualTo(
+ String.join(
+ " ",
+ java,
+ jvmmem,
+ BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+ logfile,
+ logback,
+ log4j,
+ mainClass,
+ args,
+ redirects));
+
+ assertThat(
+ BootstrapTools.getTaskManagerShellCommand(
+ cfg,
+ containeredParams,
+ "./conf",
+ "./logs",
+ true,
+ true,
+ true,
+ this.getClass(),
+ mainArgs))
+ .isEqualTo(
+ String.join(
+ " ",
+ java,
+ jvmmem,
+ BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+ krb5,
+ logfile,
+ logback,
+ log4j,
+ mainClass,
+ args,
+ redirects));
// logback + log4j, with/out krb5, different JVM opts
cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
- assertEquals(
- String.join(
- " ",
- java,
- jvmmem,
- jvmOpts,
- BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
- logfile,
- logback,
- log4j,
- mainClass,
- args,
- redirects),
- BootstrapTools.getTaskManagerShellCommand(
- cfg,
- containeredParams,
- "./conf",
- "./logs",
- true,
- true,
- false,
- this.getClass(),
- mainArgs));
-
- assertEquals(
- String.join(
- " ",
- java,
- jvmmem,
- jvmOpts,
- BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
- krb5,
- logfile,
- logback,
- log4j,
- mainClass,
- args,
- redirects),
- BootstrapTools.getTaskManagerShellCommand(
- cfg,
- containeredParams,
- "./conf",
- "./logs",
- true,
- true,
- true,
- this.getClass(),
- mainArgs));
+ assertThat(
+ BootstrapTools.getTaskManagerShellCommand(
+ cfg,
+ containeredParams,
+ "./conf",
+ "./logs",
+ true,
+ true,
+ false,
+ this.getClass(),
+ mainArgs))
+ .isEqualTo(
+ String.join(
+ " ",
+ java,
+ jvmmem,
+ jvmOpts,
+ BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+ logfile,
+ logback,
+ log4j,
+ mainClass,
+ args,
+ redirects));
+
+ assertThat(
+ BootstrapTools.getTaskManagerShellCommand(
+ cfg,
+ containeredParams,
+ "./conf",
+ "./logs",
+ true,
+ true,
+ true,
+ this.getClass(),
+ mainArgs))
+ .isEqualTo(
+ String.join(
+ " ",
+ java,
+ jvmmem,
+ jvmOpts,
+ BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+ krb5,
+ logfile,
+ logback,
+ log4j,
+ mainClass,
+ args,
+ redirects));
// logback + log4j, with/out krb5, different JVM opts
cfg.setString(CoreOptions.FLINK_TM_JVM_OPTIONS, tmJvmOpts);
- assertEquals(
- String.join(
- " ",
- java,
- jvmmem,
- jvmOpts,
- tmJvmOpts,
- BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
- logfile,
- logback,
- log4j,
- mainClass,
- args,
- redirects),
- BootstrapTools.getTaskManagerShellCommand(
- cfg,
- containeredParams,
- "./conf",
- "./logs",
- true,
- true,
- false,
- this.getClass(),
- mainArgs));
-
- assertEquals(
- String.join(
- " ",
- java,
- jvmmem,
- jvmOpts,
- tmJvmOpts,
- BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
- krb5,
- logfile,
- logback,
- log4j,
- mainClass,
- args,
- redirects),
- BootstrapTools.getTaskManagerShellCommand(
- cfg,
- containeredParams,
- "./conf",
- "./logs",
- true,
- true,
- true,
- this.getClass(),
- mainArgs));
+ assertThat(
+ BootstrapTools.getTaskManagerShellCommand(
+ cfg,
+ containeredParams,
+ "./conf",
+ "./logs",
+ true,
+ true,
+ false,
+ this.getClass(),
+ mainArgs))
+ .isEqualTo(
+ String.join(
+ " ",
+ java,
+ jvmmem,
+ jvmOpts,
+ tmJvmOpts,
+ BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+ logfile,
+ logback,
+ log4j,
+ mainClass,
+ args,
+ redirects));
+
+ assertThat(
+ BootstrapTools.getTaskManagerShellCommand(
+ cfg,
+ containeredParams,
+ "./conf",
+ "./logs",
+ true,
+ true,
+ true,
+ this.getClass(),
+ mainArgs))
+ .isEqualTo(
+ String.join(
+ " ",
+ java,
+ jvmmem,
+ jvmOpts,
+ tmJvmOpts,
+ BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+ krb5,
+ logfile,
+ logback,
+ log4j,
+ mainClass,
+ args,
+ redirects));
// now try some configurations with different
yarn.container-start-command-template
cfg.setString(
ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
"%java% 1 %jvmmem% 2 %jvmopts% 3 %logging% 4 %class% 5 %args%
6 %redirects%");
- assertEquals(
- String.join(
- " ",
- java,
- "1",
- jvmmem,
- "2",
- jvmOpts,
- tmJvmOpts,
- BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
- krb5,
- "3",
- logfile,
- logback,
- log4j,
- "4",
- mainClass,
- "5",
- args,
- "6",
- redirects),
- BootstrapTools.getTaskManagerShellCommand(
- cfg,
- containeredParams,
- "./conf",
- "./logs",
- true,
- true,
- true,
- this.getClass(),
- mainArgs));
+ assertThat(
+ BootstrapTools.getTaskManagerShellCommand(
+ cfg,
+ containeredParams,
+ "./conf",
+ "./logs",
+ true,
+ true,
+ true,
+ this.getClass(),
+ mainArgs))
+ .isEqualTo(
+ String.join(
+ " ",
+ java,
+ "1",
+ jvmmem,
+ "2",
+ jvmOpts,
+ tmJvmOpts,
+ BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+ krb5,
+ "3",
+ logfile,
+ logback,
+ log4j,
+ "4",
+ mainClass,
+ "5",
+ args,
+ "6",
+ redirects));
cfg.setString(
ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
"%java% %logging% %jvmopts% %jvmmem% %class% %args%
%redirects%");
- assertEquals(
- String.join(
- " ",
- java,
- logfile,
- logback,
- log4j,
- jvmOpts,
- tmJvmOpts,
- BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
- krb5,
- jvmmem,
- mainClass,
- args,
- redirects),
- BootstrapTools.getTaskManagerShellCommand(
- cfg,
- containeredParams,
- "./conf",
- "./logs",
- true,
- true,
- true,
- this.getClass(),
- mainArgs));
+ assertThat(
+ BootstrapTools.getTaskManagerShellCommand(
+ cfg,
+ containeredParams,
+ "./conf",
+ "./logs",
+ true,
+ true,
+ true,
+ this.getClass(),
+ mainArgs))
+ .isEqualTo(
+ String.join(
+ " ",
+ java,
+ logfile,
+ logback,
+ log4j,
+ jvmOpts,
+ tmJvmOpts,
+ BootstrapTools.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+ krb5,
+ jvmmem,
+ mainClass,
+ args,
+ redirects));
}
@Test
- public void testUpdateTmpDirectoriesInConfiguration() {
+ void testUpdateTmpDirectoriesInConfiguration() {
Configuration config = new Configuration();
// test that default value is taken
BootstrapTools.updateTmpDirectoriesInConfiguration(config,
"default/directory/path");
- assertEquals(config.getString(CoreOptions.TMP_DIRS),
"default/directory/path");
+
assertThat(config.getString(CoreOptions.TMP_DIRS)).isEqualTo("default/directory/path");
// test that we ignore default value is value is set before
BootstrapTools.updateTmpDirectoriesInConfiguration(config,
"not/default/directory/path");
- assertEquals(config.getString(CoreOptions.TMP_DIRS),
"default/directory/path");
+
assertThat(config.getString(CoreOptions.TMP_DIRS)).isEqualTo("default/directory/path");
// test that empty value is not a magic string
config.setString(CoreOptions.TMP_DIRS, "");
BootstrapTools.updateTmpDirectoriesInConfiguration(config,
"some/new/path");
- assertEquals(config.getString(CoreOptions.TMP_DIRS), "");
+ assertThat(config.getString(CoreOptions.TMP_DIRS)).isEmpty();
}
@Test
- public void
testShouldNotUpdateTmpDirectoriesInConfigurationIfNoValueConfigured() {
+ void testShouldNotUpdateTmpDirectoriesInConfigurationIfNoValueConfigured()
{
Configuration config = new Configuration();
BootstrapTools.updateTmpDirectoriesInConfiguration(config, null);
- assertEquals(config.getString(CoreOptions.TMP_DIRS),
CoreOptions.TMP_DIRS.defaultValue());
+ assertThat(CoreOptions.TMP_DIRS.defaultValue())
+ .isEqualTo(config.getString(CoreOptions.TMP_DIRS));
}
@Test
- public void testGetDynamicPropertiesAsString() {
+ void testGetDynamicPropertiesAsString() {
final Configuration baseConfig = new Configuration();
baseConfig.setString("key.a", "a");
baseConfig.setString("key.b", "b1");
@@ -590,67 +595,67 @@ public class BootstrapToolsTest extends TestLogger {
final String dynamicProperties =
BootstrapTools.getDynamicPropertiesAsString(baseConfig,
targetConfig);
if (OperatingSystem.isWindows()) {
- assertEquals("-Dkey.b=\"b2\" -Dkey.c=\"c\"", dynamicProperties);
+ assertThat(dynamicProperties).isEqualTo("-Dkey.b=\"b2\"
-Dkey.c=\"c\"");
} else {
- assertEquals("-Dkey.b='b2' -Dkey.c='c'", dynamicProperties);
+ assertThat(dynamicProperties).isEqualTo("-Dkey.b='b2'
-Dkey.c='c'");
}
}
@Test
- public void testEscapeDynamicPropertyValueWithSingleQuote() {
+ void testEscapeDynamicPropertyValueWithSingleQuote() {
final String value1 = "#a,b&c^d*e@f(g!h";
- assertEquals("'" + value1 + "'",
BootstrapTools.escapeWithSingleQuote(value1));
+ assertThat(BootstrapTools.escapeWithSingleQuote(value1)).isEqualTo("'"
+ value1 + "'");
final String value2 = "'foobar";
- assertEquals("''\\''foobar'",
BootstrapTools.escapeWithSingleQuote(value2));
+
assertThat(BootstrapTools.escapeWithSingleQuote(value2)).isEqualTo("''\\''foobar'");
final String value3 = "foo''bar";
- assertEquals("'foo'\\'''\\''bar'",
BootstrapTools.escapeWithSingleQuote(value3));
+
assertThat(BootstrapTools.escapeWithSingleQuote(value3)).isEqualTo("'foo'\\'''\\''bar'");
final String value4 = "'foo' 'bar'";
- assertEquals("''\\''foo'\\'' '\\''bar'\\'''",
BootstrapTools.escapeWithSingleQuote(value4));
+ assertThat(BootstrapTools.escapeWithSingleQuote(value4))
+ .isEqualTo("''\\''foo'\\'' '\\''bar'\\'''");
}
@Test
- public void testEscapeDynamicPropertyValueWithDoubleQuote() {
+ void testEscapeDynamicPropertyValueWithDoubleQuote() {
final String value1 = "#a,b&c^d*e@f(g!h";
- assertEquals("\"#a,b&c\"^^\"d*e@f(g!h\"",
BootstrapTools.escapeWithDoubleQuote(value1));
+ assertThat(BootstrapTools.escapeWithDoubleQuote(value1))
+ .isEqualTo("\"#a,b&c\"^^\"d*e@f(g!h\"");
final String value2 = "foo\"bar'";
- assertEquals("\"foo\\\"bar'\"",
BootstrapTools.escapeWithDoubleQuote(value2));
+
assertThat(BootstrapTools.escapeWithDoubleQuote(value2)).isEqualTo("\"foo\\\"bar'\"");
final String value3 = "\"foo\" \"bar\"";
- assertEquals("\"\\\"foo\\\" \\\"bar\\\"\"",
BootstrapTools.escapeWithDoubleQuote(value3));
+ assertThat(BootstrapTools.escapeWithDoubleQuote(value3))
+ .isEqualTo("\"\\\"foo\\\" \\\"bar\\\"\"");
}
@Test
- public void testGetEnvironmentVariables() {
+ void testGetEnvironmentVariables() {
Configuration testConf = new Configuration();
testConf.setString("containerized.master.env.LD_LIBRARY_PATH",
"/usr/lib/native");
Map<String, String> res =
ConfigurationUtils.getPrefixedKeyValuePairs("containerized.master.env.",
testConf);
- Assert.assertEquals(1, res.size());
- Map.Entry<String, String> entry = res.entrySet().iterator().next();
- Assert.assertEquals("LD_LIBRARY_PATH", entry.getKey());
- Assert.assertEquals("/usr/lib/native", entry.getValue());
+ assertThat(res).hasSize(1).containsEntry("LD_LIBRARY_PATH",
"/usr/lib/native");
}
@Test
- public void testGetEnvironmentVariablesErroneous() {
+ void testGetEnvironmentVariablesErroneous() {
Configuration testConf = new Configuration();
testConf.setString("containerized.master.env.", "/usr/lib/native");
Map<String, String> res =
ConfigurationUtils.getPrefixedKeyValuePairs("containerized.master.env.",
testConf);
- Assert.assertEquals(0, res.size());
+ assertThat(res).isEmpty();
}
@Test
- public void testWriteConfigurationAndReload() throws IOException {
- final File flinkConfDir =
temporaryFolder.newFolder().getAbsoluteFile();
+ void testWriteConfigurationAndReload() throws IOException {
+ final File flinkConfDir =
TempDirUtils.newFolder(temporaryFolder).getAbsoluteFile();
final Configuration flinkConfig = new Configuration();
final ConfigOption<List<String>> listStringConfigOption =
@@ -658,7 +663,8 @@ public class BootstrapToolsTest extends TestLogger {
final List<String> list =
Arrays.asList("A,B,C,D", "A'B'C'D", "A;BCD", "AB\"C\"D",
"AB'\"D:B");
flinkConfig.set(listStringConfigOption, list);
- assertThat(flinkConfig.get(listStringConfigOption),
containsInAnyOrder(list.toArray()));
+ assertThat(flinkConfig.get(listStringConfigOption))
+ .containsExactlyInAnyOrderElementsOf(list);
final ConfigOption<List<Duration>> listDurationConfigOption =
ConfigOptions.key("test-list-duration-key")
@@ -668,9 +674,8 @@ public class BootstrapToolsTest extends TestLogger {
final List<Duration> durationList =
Arrays.asList(Duration.ofSeconds(3), Duration.ofMinutes(1));
flinkConfig.set(listDurationConfigOption, durationList);
- assertThat(
- flinkConfig.get(listDurationConfigOption),
- containsInAnyOrder(durationList.toArray()));
+ assertThat(flinkConfig.get(listDurationConfigOption))
+ .containsExactlyInAnyOrderElementsOf(durationList);
final ConfigOption<Map<String, String>> mapConfigOption =
ConfigOptions.key("test-map-key").mapType().noDefaultValue();
@@ -681,27 +686,22 @@ public class BootstrapToolsTest extends TestLogger {
map.put("key4", "AB\"C\"D");
map.put("key5", "AB'\"D:B");
flinkConfig.set(mapConfigOption, map);
- assertThat(
- flinkConfig.get(mapConfigOption).entrySet(),
- containsInAnyOrder(map.entrySet().toArray()));
+ assertThat(flinkConfig.get(mapConfigOption)).containsAllEntriesOf(map);
final ConfigOption<Duration> durationConfigOption =
ConfigOptions.key("test-duration-key").durationType().noDefaultValue();
final Duration duration = Duration.ofMillis(3000);
flinkConfig.set(durationConfigOption, duration);
- assertEquals(duration, flinkConfig.get(durationConfigOption));
+ assertThat(flinkConfig.get(durationConfigOption)).isEqualTo(duration);
BootstrapTools.writeConfiguration(flinkConfig, new File(flinkConfDir,
FLINK_CONF_FILENAME));
final Configuration loadedFlinkConfig =
GlobalConfiguration.loadConfiguration(flinkConfDir.getAbsolutePath());
- assertThat(
- loadedFlinkConfig.get(listStringConfigOption),
containsInAnyOrder(list.toArray()));
- assertThat(
- loadedFlinkConfig.get(listDurationConfigOption),
- containsInAnyOrder(durationList.toArray()));
- assertThat(
- loadedFlinkConfig.get(mapConfigOption).entrySet(),
- containsInAnyOrder(map.entrySet().toArray()));
- assertEquals(duration, loadedFlinkConfig.get(durationConfigOption));
+ assertThat(loadedFlinkConfig.get(listStringConfigOption))
+ .containsExactlyInAnyOrderElementsOf(list);
+ assertThat(loadedFlinkConfig.get(listDurationConfigOption))
+ .containsExactlyInAnyOrderElementsOf(durationList);
+
assertThat(loadedFlinkConfig.get(mapConfigOption)).containsAllEntriesOf(map);
+
assertThat(loadedFlinkConfig.get(durationConfigOption)).isEqualTo(duration);
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java
index 3ea74f6a47f..377cbf02180 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtilsTestBase;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Map;
@@ -39,17 +39,12 @@ import java.util.function.Consumer;
import static
org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils.TM_LEGACY_HEAP_OPTIONS;
import static
org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils.TM_PROCESS_MEMORY_OPTIONS;
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link TaskExecutorProcessUtils}. */
-public class TaskExecutorProcessUtilsTest
- extends ProcessMemoryUtilsTestBase<TaskExecutorProcessSpec> {
+class TaskExecutorProcessUtilsTest extends
ProcessMemoryUtilsTestBase<TaskExecutorProcessSpec> {
private static final MemorySize TASK_HEAP_SIZE = MemorySize.parse("100m");
private static final MemorySize MANAGED_MEM_SIZE =
MemorySize.parse("200m");
@@ -81,77 +76,72 @@ public class TaskExecutorProcessUtilsTest
}
@Test
- public void testGenerateDynamicConfigurations() {
+ void testGenerateDynamicConfigurations() {
String dynamicConfigsStr =
TaskExecutorProcessUtils.generateDynamicConfigsStr(TM_RESOURCE_SPEC);
Map<String, String> configs =
ConfigurationUtils.parseTmResourceDynamicConfigs(dynamicConfigsStr);
assertThat(
- new
CPUResource(Double.valueOf(configs.get(TaskManagerOptions.CPU_CORES.key()))),
- is(TM_RESOURCE_SPEC.getCpuCores()));
+ new CPUResource(
+ Double.parseDouble(
+
configs.get(TaskManagerOptions.CPU_CORES.key()))))
+ .isEqualTo(TM_RESOURCE_SPEC.getCpuCores());
+
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key())))
+ .isEqualTo(TM_RESOURCE_SPEC.getFrameworkHeapSize());
assertThat(
-
MemorySize.parse(configs.get(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key())),
- is(TM_RESOURCE_SPEC.getFrameworkHeapSize()));
+ MemorySize.parse(
+
configs.get(TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY.key())))
+ .isEqualTo(TM_RESOURCE_SPEC.getFrameworkOffHeapMemorySize());
+
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_HEAP_MEMORY.key())))
+ .isEqualTo(TM_RESOURCE_SPEC.getTaskHeapSize());
+
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key())))
+ .isEqualTo(TM_RESOURCE_SPEC.getTaskOffHeapSize());
+
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.NETWORK_MEMORY_MAX.key())))
+ .isEqualTo(TM_RESOURCE_SPEC.getNetworkMemSize());
+
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.NETWORK_MEMORY_MIN.key())))
+ .isEqualTo(TM_RESOURCE_SPEC.getNetworkMemSize());
+
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_SIZE.key())))
+ .isEqualTo(TM_RESOURCE_SPEC.getManagedMemorySize());
+
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.JVM_METASPACE.key())))
+
.isEqualTo(TM_RESOURCE_SPEC.getJvmMetaspaceAndOverhead().getMetaspace());
+
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.JVM_OVERHEAD_MIN.key())))
+
.isEqualTo(TM_RESOURCE_SPEC.getJvmMetaspaceAndOverhead().getOverhead());
+
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.JVM_OVERHEAD_MAX.key())))
+
.isEqualTo(TM_RESOURCE_SPEC.getJvmMetaspaceAndOverhead().getOverhead());
+
assertThat(Integer.valueOf(configs.get(TaskManagerOptions.NUM_TASK_SLOTS.key())))
+ .isEqualTo(TM_RESOURCE_SPEC.getNumSlots());
+
assertThat(configs.get(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key()))
+ .isEqualTo(
+ '"'
+ + String.join(";",
TM_RESOURCE_SPEC.getExtendedResources().keySet())
+ + '"');
assertThat(
-
MemorySize.parse(configs.get(TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY.key())),
- is(TM_RESOURCE_SPEC.getFrameworkOffHeapMemorySize()));
- assertThat(
-
MemorySize.parse(configs.get(TaskManagerOptions.TASK_HEAP_MEMORY.key())),
- is(TM_RESOURCE_SPEC.getTaskHeapSize()));
- assertThat(
-
MemorySize.parse(configs.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key())),
- is(TM_RESOURCE_SPEC.getTaskOffHeapSize()));
- assertThat(
-
MemorySize.parse(configs.get(TaskManagerOptions.NETWORK_MEMORY_MAX.key())),
- is(TM_RESOURCE_SPEC.getNetworkMemSize()));
- assertThat(
-
MemorySize.parse(configs.get(TaskManagerOptions.NETWORK_MEMORY_MIN.key())),
- is(TM_RESOURCE_SPEC.getNetworkMemSize()));
- assertThat(
-
MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_SIZE.key())),
- is(TM_RESOURCE_SPEC.getManagedMemorySize()));
- assertThat(
-
MemorySize.parse(configs.get(TaskManagerOptions.JVM_METASPACE.key())),
-
is(TM_RESOURCE_SPEC.getJvmMetaspaceAndOverhead().getMetaspace()));
- assertThat(
-
MemorySize.parse(configs.get(TaskManagerOptions.JVM_OVERHEAD_MIN.key())),
-
is(TM_RESOURCE_SPEC.getJvmMetaspaceAndOverhead().getOverhead()));
- assertThat(
-
MemorySize.parse(configs.get(TaskManagerOptions.JVM_OVERHEAD_MAX.key())),
-
is(TM_RESOURCE_SPEC.getJvmMetaspaceAndOverhead().getOverhead()));
- assertThat(
-
Integer.valueOf(configs.get(TaskManagerOptions.NUM_TASK_SLOTS.key())),
- is(TM_RESOURCE_SPEC.getNumSlots()));
- assertThat(
-
configs.get(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key()),
- is('"' + String.join(";",
TM_RESOURCE_SPEC.getExtendedResources().keySet()) + '"'));
- assertThat(
- configs.get(
-
ExternalResourceOptions.getAmountConfigOptionForResource(
- EXTERNAL_RESOURCE_NAME_1)),
- is(
+ configs.get(
+
ExternalResourceOptions.getAmountConfigOptionForResource(
+ EXTERNAL_RESOURCE_NAME_1)))
+ .isEqualTo(
String.valueOf(
TM_RESOURCE_SPEC
.getExtendedResources()
.get(EXTERNAL_RESOURCE_NAME_1)
.getValue()
- .longValue())));
+ .longValue()));
assertThat(
- configs.get(
-
ExternalResourceOptions.getAmountConfigOptionForResource(
- EXTERNAL_RESOURCE_NAME_2)),
- is(
+ configs.get(
+
ExternalResourceOptions.getAmountConfigOptionForResource(
+ EXTERNAL_RESOURCE_NAME_2)))
+ .isEqualTo(
String.valueOf(
TM_RESOURCE_SPEC
.getExtendedResources()
.get(EXTERNAL_RESOURCE_NAME_2)
.getValue()
- .longValue())));
+ .longValue()));
}
@Test
- public void testProcessSpecFromWorkerResourceSpec() {
+ void testProcessSpecFromWorkerResourceSpec() {
final WorkerResourceSpec workerResourceSpec =
new WorkerResourceSpec.Builder()
.setCpuCores(1.0)
@@ -165,26 +155,24 @@ public class TaskExecutorProcessUtilsTest
final TaskExecutorProcessSpec taskExecutorProcessSpec =
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(
new Configuration(), workerResourceSpec);
- assertEquals(workerResourceSpec.getCpuCores(),
taskExecutorProcessSpec.getCpuCores());
- assertEquals(
- workerResourceSpec.getTaskHeapSize(),
taskExecutorProcessSpec.getTaskHeapSize());
- assertEquals(
- workerResourceSpec.getTaskOffHeapSize(),
- taskExecutorProcessSpec.getTaskOffHeapSize());
- assertEquals(
- workerResourceSpec.getNetworkMemSize(),
- taskExecutorProcessSpec.getNetworkMemSize());
- assertEquals(
- workerResourceSpec.getManagedMemSize(),
- taskExecutorProcessSpec.getManagedMemorySize());
- assertEquals(workerResourceSpec.getNumSlots(),
taskExecutorProcessSpec.getNumSlots());
- assertEquals(
- workerResourceSpec.getExtendedResources(),
- taskExecutorProcessSpec.getExtendedResources());
+ assertThat(taskExecutorProcessSpec.getCpuCores())
+ .isEqualTo(workerResourceSpec.getCpuCores());
+ assertThat(taskExecutorProcessSpec.getTaskHeapSize())
+ .isEqualTo(workerResourceSpec.getTaskHeapSize());
+ assertThat(taskExecutorProcessSpec.getTaskOffHeapSize())
+ .isEqualTo(workerResourceSpec.getTaskOffHeapSize());
+ assertThat(taskExecutorProcessSpec.getNetworkMemSize())
+ .isEqualTo(workerResourceSpec.getNetworkMemSize());
+ assertThat(taskExecutorProcessSpec.getManagedMemorySize())
+ .isEqualTo(workerResourceSpec.getManagedMemSize());
+ assertThat(taskExecutorProcessSpec.getNumSlots())
+ .isEqualTo(workerResourceSpec.getNumSlots());
+ assertThat(taskExecutorProcessSpec.getExtendedResources())
+ .isEqualTo(workerResourceSpec.getExtendedResources());
}
@Test
- public void testConfigCpuCores() {
+ void testConfigCpuCores() {
final double cpuCores = 1.0;
Configuration conf = new Configuration();
@@ -193,31 +181,30 @@ public class TaskExecutorProcessUtilsTest
validateInAllConfigurations(
conf,
taskExecutorProcessSpec ->
- assertThat(
- taskExecutorProcessSpec.getCpuCores(),
- is(new CPUResource(cpuCores))));
+ assertThat(taskExecutorProcessSpec.getCpuCores())
+ .isEqualTo(new CPUResource(cpuCores)));
}
@Test
- public void testConfigNoCpuCores() {
+ void testConfigNoCpuCores() {
Configuration conf = new Configuration();
conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
validateInAllConfigurations(
conf,
taskExecutorProcessSpec ->
- assertThat(
- taskExecutorProcessSpec.getCpuCores(), is(new
CPUResource(3.0))));
+ assertThat(taskExecutorProcessSpec.getCpuCores())
+ .isEqualTo(new CPUResource(3.0)));
}
@Test
- public void testConfigNegativeCpuCores() {
+ void testConfigNegativeCpuCores() {
Configuration conf = new Configuration();
conf.setDouble(TaskManagerOptions.CPU_CORES, -0.1f);
validateFailInAllConfigurations(conf);
}
@Test
- public void testConfigFrameworkHeapMemory() {
+ void testConfigFrameworkHeapMemory() {
final MemorySize frameworkHeapSize = MemorySize.parse("100m");
Configuration conf = new Configuration();
@@ -226,13 +213,12 @@ public class TaskExecutorProcessUtilsTest
validateInAllConfigurations(
conf,
taskExecutorProcessSpec ->
- assertThat(
- taskExecutorProcessSpec.getFrameworkHeapSize(),
- is(frameworkHeapSize)));
+
assertThat(taskExecutorProcessSpec.getFrameworkHeapSize())
+ .isEqualTo(frameworkHeapSize));
}
@Test
- public void testConfigFrameworkOffHeapMemory() {
+ void testConfigFrameworkOffHeapMemory() {
final MemorySize frameworkOffHeapSize = MemorySize.parse("10m");
Configuration conf = new Configuration();
@@ -241,13 +227,12 @@ public class TaskExecutorProcessUtilsTest
validateInAllConfigurations(
conf,
taskExecutorProcessSpec ->
- assertThat(
-
taskExecutorProcessSpec.getFrameworkOffHeapMemorySize(),
- is(frameworkOffHeapSize)));
+
assertThat(taskExecutorProcessSpec.getFrameworkOffHeapMemorySize())
+ .isEqualTo(frameworkOffHeapSize));
}
@Test
- public void testConfigTaskHeapMemory() {
+ void testConfigTaskHeapMemory() {
final MemorySize taskHeapSize = MemorySize.parse("50m");
Configuration conf = new Configuration();
@@ -258,11 +243,12 @@ public class TaskExecutorProcessUtilsTest
validateInConfigurationsWithoutExplicitTaskHeapMem(
conf,
taskExecutorProcessSpec ->
- assertThat(taskExecutorProcessSpec.getTaskHeapSize(),
is(taskHeapSize)));
+ assertThat(taskExecutorProcessSpec.getTaskHeapSize())
+ .isEqualTo(taskHeapSize));
}
@Test
- public void testConfigTaskOffheapMemory() {
+ void testConfigTaskOffheapMemory() {
final MemorySize taskOffHeapSize = MemorySize.parse("50m");
Configuration conf = new Configuration();
@@ -271,12 +257,12 @@ public class TaskExecutorProcessUtilsTest
validateInAllConfigurations(
conf,
taskExecutorProcessSpec ->
- assertThat(
- taskExecutorProcessSpec.getTaskOffHeapSize(),
is(taskOffHeapSize)));
+
assertThat(taskExecutorProcessSpec.getTaskOffHeapSize())
+ .isEqualTo(taskOffHeapSize));
}
@Test
- public void testConfigNetworkMemoryRange() {
+ void testConfigNetworkMemoryRange() {
final MemorySize networkMin = MemorySize.parse("200m");
final MemorySize networkMax = MemorySize.parse("500m");
@@ -287,18 +273,15 @@ public class TaskExecutorProcessUtilsTest
validateInAllConfigurations(
conf,
taskExecutorProcessSpec -> {
- assertThat(
-
taskExecutorProcessSpec.getNetworkMemSize().getBytes(),
- greaterThanOrEqualTo(networkMin.getBytes()));
- assertThat(
-
taskExecutorProcessSpec.getNetworkMemSize().getBytes(),
- lessThanOrEqualTo(networkMax.getBytes()));
+
assertThat(taskExecutorProcessSpec.getNetworkMemSize().getBytes())
+ .isGreaterThanOrEqualTo(networkMin.getBytes());
+
assertThat(taskExecutorProcessSpec.getNetworkMemSize().getBytes())
+ .isLessThanOrEqualTo(networkMax.getBytes());
});
}
@Test
- public void
-
testConsistencyCheckOfDerivedNetworkMemoryWithinMinMaxRangeNotMatchingFractionPasses()
{
+ void
testConsistencyCheckOfDerivedNetworkMemoryWithinMinMaxRangeNotMatchingFractionPasses()
{
final Configuration configuration =
setupConfigWithFlinkAndTaskHeapToDeriveGivenNetworkMem(400);
// set fraction to be extremely low to not match the derived network
memory
@@ -307,27 +290,30 @@ public class TaskExecutorProcessUtilsTest
TaskExecutorProcessUtils.processSpecFromConfig(configuration);
}
- @Test(expected = IllegalConfigurationException.class)
+ @Test
public void testConsistencyCheckOfDerivedNetworkMemoryLessThanMinFails() {
final Configuration configuration =
setupConfigWithFlinkAndTaskHeapToDeriveGivenNetworkMem(500);
configuration.set(TaskManagerOptions.NETWORK_MEMORY_MIN,
MemorySize.parse("900m"));
configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX,
MemorySize.parse("1000m"));
+
// internal validation should fail
- TaskExecutorProcessUtils.processSpecFromConfig(configuration);
+ assertThatExceptionOfType(IllegalConfigurationException.class)
+ .isThrownBy(() ->
TaskExecutorProcessUtils.processSpecFromConfig(configuration));
}
- @Test(expected = IllegalConfigurationException.class)
+ @Test
public void
testConsistencyCheckOfDerivedNetworkMemoryGreaterThanMaxFails() {
final Configuration configuration =
setupConfigWithFlinkAndTaskHeapToDeriveGivenNetworkMem(500);
configuration.set(TaskManagerOptions.NETWORK_MEMORY_MIN,
MemorySize.parse("100m"));
configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX,
MemorySize.parse("150m"));
// internal validation should fail
- TaskExecutorProcessUtils.processSpecFromConfig(configuration);
+ assertThatExceptionOfType(IllegalConfigurationException.class)
+ .isThrownBy(() ->
TaskExecutorProcessUtils.processSpecFromConfig(configuration));
}
- @Test(expected = IllegalConfigurationException.class)
+ @Test
public void
testConsistencyCheckOfDerivedNetworkMemoryDoesNotMatchLegacyConfigFails() {
final int numberOfNetworkBuffers = 10;
final int pageSizeMb = 16;
@@ -340,7 +326,8 @@ public class TaskExecutorProcessUtilsTest
configuration.setInteger(
NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS,
numberOfNetworkBuffers);
// internal validation should fail
- TaskExecutorProcessUtils.processSpecFromConfig(configuration);
+ assertThatExceptionOfType(IllegalConfigurationException.class)
+ .isThrownBy(() ->
TaskExecutorProcessUtils.processSpecFromConfig(configuration));
}
private static Configuration
setupConfigWithFlinkAndTaskHeapToDeriveGivenNetworkMem(
@@ -376,15 +363,14 @@ public class TaskExecutorProcessUtilsTest
final TaskExecutorProcessSpec adjusteedTaskExecutorProcessSpec =
TaskExecutorProcessUtils.processSpecFromConfig(conf);
- assertThat(
-
adjusteedTaskExecutorProcessSpec.getNetworkMemSize().getMebiBytes(),
- is(networkMemorySizeToDeriveMb));
+
assertThat(adjusteedTaskExecutorProcessSpec.getNetworkMemSize().getMebiBytes())
+ .isEqualTo(networkMemorySizeToDeriveMb);
return conf;
}
@Test
- public void testConfigNetworkMemoryRangeFailure() {
+ void testConfigNetworkMemoryRangeFailure() {
final MemorySize networkMin = MemorySize.parse("200m");
final MemorySize networkMax = MemorySize.parse("50m");
@@ -396,7 +382,7 @@ public class TaskExecutorProcessUtilsTest
}
@Test
- public void testConfigNetworkMemoryFraction() {
+ void testConfigNetworkMemoryFraction() {
final MemorySize networkMin = MemorySize.ZERO;
final MemorySize networkMax = MemorySize.parse("1t");
final float fraction = 0.2f;
@@ -413,16 +399,15 @@ public class TaskExecutorProcessUtilsTest
validateInConfigWithExplicitTaskHeapAndManagedMem(
conf,
taskExecutorProcessSpec ->
- assertThat(
- taskExecutorProcessSpec.getNetworkMemSize(),
- is(
+ assertThat(taskExecutorProcessSpec.getNetworkMemSize())
+ .isEqualTo(
taskExecutorProcessSpec
.getTotalFlinkMemorySize()
- .multiply(fraction))));
+ .multiply(fraction)));
}
@Test
- public void testConfigNetworkMemoryFractionFailure() {
+ void testConfigNetworkMemoryFractionFailure() {
Configuration conf = new Configuration();
conf.setFloat(TaskManagerOptions.NETWORK_MEMORY_FRACTION, -0.1f);
validateFailInAllConfigurations(conf);
@@ -432,7 +417,7 @@ public class TaskExecutorProcessUtilsTest
}
@Test
- public void testConfigNetworkMemoryLegacyRangeFraction() {
+ void testConfigNetworkMemoryLegacyRangeFraction() {
final MemorySize networkMin = MemorySize.parse("200m");
final MemorySize networkMax = MemorySize.parse("500m");
@@ -455,12 +440,10 @@ public class TaskExecutorProcessUtilsTest
validateInAllConfigurations(
conf,
taskExecutorProcessSpec -> {
- assertThat(
-
taskExecutorProcessSpec.getNetworkMemSize().getBytes(),
- greaterThanOrEqualTo(networkMin.getBytes()));
- assertThat(
-
taskExecutorProcessSpec.getNetworkMemSize().getBytes(),
- lessThanOrEqualTo(networkMax.getBytes()));
+
assertThat(taskExecutorProcessSpec.getNetworkMemSize().getBytes())
+ .isGreaterThanOrEqualTo(networkMin.getBytes());
+
assertThat(taskExecutorProcessSpec.getNetworkMemSize().getBytes())
+ .isLessThanOrEqualTo(networkMax.getBytes());
});
conf.setString(legacyOptionMin, "0m");
@@ -470,16 +453,15 @@ public class TaskExecutorProcessUtilsTest
validateInConfigWithExplicitTaskHeapAndManagedMem(
conf,
taskExecutorProcessSpec ->
- assertThat(
- taskExecutorProcessSpec.getNetworkMemSize(),
- is(
+ assertThat(taskExecutorProcessSpec.getNetworkMemSize())
+ .isEqualTo(
taskExecutorProcessSpec
.getTotalFlinkMemorySize()
- .multiply(fraction))));
+ .multiply(fraction)));
}
@Test
- public void testConfigNetworkMemoryLegacyNumOfBuffers() {
+ void testConfigNetworkMemoryLegacyNumOfBuffers() {
final MemorySize pageSize = MemorySize.parse("32k");
final int numOfBuffers = 1024;
final MemorySize networkSize = pageSize.multiply(numOfBuffers);
@@ -499,15 +481,17 @@ public class TaskExecutorProcessUtilsTest
validateInConfigWithExplicitTaskHeapAndManagedMem(
conf,
taskExecutorProcessSpec ->
-
assertThat(taskExecutorProcessSpec.getNetworkMemSize(), is(networkSize)));
+ assertThat(taskExecutorProcessSpec.getNetworkMemSize())
+ .isEqualTo(networkSize));
validateInConfigurationsWithoutExplicitTaskHeapMem(
conf,
taskExecutorProcessSpec ->
-
assertThat(taskExecutorProcessSpec.getNetworkMemSize(), is(networkSize)));
+ assertThat(taskExecutorProcessSpec.getNetworkMemSize())
+ .isEqualTo(networkSize));
}
@Test
- public void testConfigManagedMemorySize() {
+ void testConfigManagedMemorySize() {
final MemorySize managedMemSize = MemorySize.parse("100m");
Configuration conf = new Configuration();
@@ -518,13 +502,12 @@ public class TaskExecutorProcessUtilsTest
validateInConfigurationsWithoutExplicitManagedMem(
conf,
taskExecutorProcessSpec ->
- assertThat(
- taskExecutorProcessSpec.getManagedMemorySize(),
- is(managedMemSize)));
+
assertThat(taskExecutorProcessSpec.getManagedMemorySize())
+ .isEqualTo(managedMemSize));
}
@Test
- public void testConfigManagedMemoryLegacySize() {
+ void testConfigManagedMemoryLegacySize() {
final MemorySize managedMemSize = MemorySize.parse("100m");
@SuppressWarnings("deprecation")
@@ -538,13 +521,12 @@ public class TaskExecutorProcessUtilsTest
validateInConfigurationsWithoutExplicitManagedMem(
conf,
taskExecutorProcessSpec ->
- assertThat(
- taskExecutorProcessSpec.getManagedMemorySize(),
- is(managedMemSize)));
+
assertThat(taskExecutorProcessSpec.getManagedMemorySize())
+ .isEqualTo(managedMemSize));
}
@Test
- public void testConfigManagedMemoryFraction() {
+ void testConfigManagedMemoryFraction() {
final float fraction = 0.5f;
Configuration conf = new Configuration();
@@ -555,16 +537,15 @@ public class TaskExecutorProcessUtilsTest
validateInConfigurationsWithoutExplicitManagedMem(
conf,
taskExecutorProcessSpec ->
- assertThat(
- taskExecutorProcessSpec.getManagedMemorySize(),
- is(
+
assertThat(taskExecutorProcessSpec.getManagedMemorySize())
+ .isEqualTo(
taskExecutorProcessSpec
.getTotalFlinkMemorySize()
- .multiply(fraction))));
+ .multiply(fraction)));
}
@Test
- public void testConfigManagedMemoryFractionFailure() {
+ void testConfigManagedMemoryFractionFailure() {
final Configuration conf = new Configuration();
conf.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, -0.1f);
validateFailInConfigurationsWithoutExplicitManagedMem(conf);
@@ -574,7 +555,7 @@ public class TaskExecutorProcessUtilsTest
}
@Test
- public void testFlinkInternalMemorySizeAddUpFailure() {
+ void testFlinkInternalMemorySizeAddUpFailure() {
final MemorySize totalFlinkMemory = MemorySize.parse("499m");
final MemorySize frameworkHeap = MemorySize.parse("100m");
final MemorySize taskHeap = MemorySize.parse("100m");
@@ -595,7 +576,7 @@ public class TaskExecutorProcessUtilsTest
}
@Test
- public void testFlinkInternalMemoryFractionAddUpFailure() {
+ void testFlinkInternalMemoryFractionAddUpFailure() {
final float networkFraction = 0.6f;
final float managedFraction = 0.6f;
@@ -610,7 +591,7 @@ public class TaskExecutorProcessUtilsTest
}
@Test
- public void testConfigTotalProcessMemoryLegacySize() {
+ void testConfigTotalProcessMemoryLegacySize() {
final MemorySize totalProcessMemorySize = MemorySize.parse("2g");
@SuppressWarnings("deprecation")
@@ -621,25 +602,23 @@ public class TaskExecutorProcessUtilsTest
TaskExecutorProcessSpec taskExecutorProcessSpec =
TaskExecutorProcessUtils.processSpecFromConfig(conf);
- assertThat(taskExecutorProcessSpec.getTotalProcessMemorySize(),
is(totalProcessMemorySize));
+ assertThat(taskExecutorProcessSpec.getTotalProcessMemorySize())
+ .isEqualTo(totalProcessMemorySize);
}
@Test
public void testExceptionShouldContainRequiredConfigOptions() {
- try {
- TaskExecutorProcessUtils.processSpecFromConfig(new
Configuration());
- } catch (final IllegalConfigurationException e) {
- assertThat(e.getMessage(),
containsString(TaskManagerOptions.TASK_HEAP_MEMORY.key()));
- assertThat(
- e.getMessage(),
containsString(TaskManagerOptions.MANAGED_MEMORY_SIZE.key()));
- assertThat(e.getMessage(),
containsString(TaskManagerOptions.TOTAL_FLINK_MEMORY.key()));
- assertThat(
- e.getMessage(),
containsString(TaskManagerOptions.TOTAL_PROCESS_MEMORY.key()));
- }
+ assertThatThrownBy(
+ () ->
TaskExecutorProcessUtils.processSpecFromConfig(new Configuration()))
+ .isInstanceOf(IllegalConfigurationException.class)
+
.hasMessageContaining(TaskManagerOptions.TASK_HEAP_MEMORY.key())
+
.hasMessageContaining(TaskManagerOptions.MANAGED_MEMORY_SIZE.key())
+
.hasMessageContaining(TaskManagerOptions.TOTAL_FLINK_MEMORY.key())
+
.hasMessageContaining(TaskManagerOptions.TOTAL_PROCESS_MEMORY.key());
}
@Test
- public void testConfigNumSlots() {
+ void testConfigNumSlots() {
final int numSlots = 5;
Configuration conf = new Configuration();
@@ -647,13 +626,12 @@ public class TaskExecutorProcessUtilsTest
validateInAllConfigurations(
conf,
- taskExecutorProcessSpec -> {
- assertThat(taskExecutorProcessSpec.getNumSlots(),
is(numSlots));
- });
+ taskExecutorProcessSpec ->
+
assertThat(taskExecutorProcessSpec.getNumSlots()).isEqualTo(numSlots));
}
@Test
- public void testProcessSpecFromConfigWithExternalResource() {
+ void testProcessSpecFromConfigWithExternalResource() {
final Configuration config = new Configuration();
config.setString(
ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key(),
EXTERNAL_RESOURCE_NAME_1);
@@ -663,14 +641,14 @@ public class TaskExecutorProcessUtilsTest
config.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY,
MemorySize.ofMebiBytes(4096));
final TaskExecutorProcessSpec taskExecutorProcessSpec =
TaskExecutorProcessUtils.processSpecFromConfig(config);
- assertThat(taskExecutorProcessSpec.getExtendedResources().size(),
is(1));
+ assertThat(taskExecutorProcessSpec.getExtendedResources()).hasSize(1);
assertThat(
- taskExecutorProcessSpec
- .getExtendedResources()
- .get(EXTERNAL_RESOURCE_NAME_1)
- .getValue()
- .longValue(),
- is(1L));
+ taskExecutorProcessSpec
+ .getExtendedResources()
+ .get(EXTERNAL_RESOURCE_NAME_1)
+ .getValue()
+ .longValue())
+ .isOne();
}
@Override
@@ -720,8 +698,8 @@ public class TaskExecutorProcessUtilsTest
config.addAll(customConfig);
TaskExecutorProcessSpec taskExecutorProcessSpec =
TaskExecutorProcessUtils.processSpecFromConfig(config);
- assertThat(taskExecutorProcessSpec.getTaskHeapSize(),
is(TASK_HEAP_SIZE));
- assertThat(taskExecutorProcessSpec.getManagedMemorySize(),
is(MANAGED_MEM_SIZE));
+
assertThat(taskExecutorProcessSpec.getTaskHeapSize()).isEqualTo(TASK_HEAP_SIZE);
+
assertThat(taskExecutorProcessSpec.getManagedMemorySize()).isEqualTo(MANAGED_MEM_SIZE);
validateFunc.accept(taskExecutorProcessSpec);
}
@@ -741,7 +719,8 @@ public class TaskExecutorProcessUtilsTest
config.addAll(customConfig);
TaskExecutorProcessSpec taskExecutorProcessSpec =
TaskExecutorProcessUtils.processSpecFromConfig(config);
- assertThat(taskExecutorProcessSpec.getTotalFlinkMemorySize(),
is(TOTAL_FLINK_MEM_SIZE));
+ assertThat(taskExecutorProcessSpec.getTotalFlinkMemorySize())
+ .isEqualTo(TOTAL_FLINK_MEM_SIZE);
validateFunc.accept(taskExecutorProcessSpec);
}
@@ -760,8 +739,9 @@ public class TaskExecutorProcessUtilsTest
config.addAll(customConfig);
TaskExecutorProcessSpec taskExecutorProcessSpec =
TaskExecutorProcessUtils.processSpecFromConfig(config);
- assertThat(taskExecutorProcessSpec.getTotalFlinkMemorySize(),
is(TOTAL_FLINK_MEM_SIZE));
- assertThat(taskExecutorProcessSpec.getTaskHeapSize(),
is(TASK_HEAP_SIZE));
+ assertThat(taskExecutorProcessSpec.getTotalFlinkMemorySize())
+ .isEqualTo(TOTAL_FLINK_MEM_SIZE);
+
assertThat(taskExecutorProcessSpec.getTaskHeapSize()).isEqualTo(TASK_HEAP_SIZE);
validateFunc.accept(taskExecutorProcessSpec);
}
@@ -781,8 +761,9 @@ public class TaskExecutorProcessUtilsTest
config.addAll(customConfig);
TaskExecutorProcessSpec taskExecutorProcessSpec =
TaskExecutorProcessUtils.processSpecFromConfig(config);
- assertThat(taskExecutorProcessSpec.getTotalFlinkMemorySize(),
is(TOTAL_FLINK_MEM_SIZE));
- assertThat(taskExecutorProcessSpec.getManagedMemorySize(),
is(MANAGED_MEM_SIZE));
+ assertThat(taskExecutorProcessSpec.getTotalFlinkMemorySize())
+ .isEqualTo(TOTAL_FLINK_MEM_SIZE);
+
assertThat(taskExecutorProcessSpec.getManagedMemorySize()).isEqualTo(MANAGED_MEM_SIZE);
validateFunc.accept(taskExecutorProcessSpec);
}
@@ -802,7 +783,8 @@ public class TaskExecutorProcessUtilsTest
config.addAll(customConfig);
TaskExecutorProcessSpec taskExecutorProcessSpec =
TaskExecutorProcessUtils.processSpecFromConfig(config);
- assertThat(taskExecutorProcessSpec.getTotalProcessMemorySize(),
is(TOTAL_PROCESS_MEM_SIZE));
+ assertThat(taskExecutorProcessSpec.getTotalProcessMemorySize())
+ .isEqualTo(TOTAL_PROCESS_MEM_SIZE);
validateFunc.accept(taskExecutorProcessSpec);
}
@@ -815,12 +797,9 @@ public class TaskExecutorProcessUtilsTest
@Override
protected void validateFail(final Configuration config) {
- try {
- TaskExecutorProcessUtils.processSpecFromConfig(config);
- fail("Configuration did not fail as expected.");
- } catch (IllegalConfigurationException e) {
- // expected
- }
+ assertThatExceptionOfType(IllegalConfigurationException.class)
+ .as("Configuration did not fail as expected.")
+ .isThrownBy(() ->
TaskExecutorProcessUtils.processSpecFromConfig(config));
}
@Override
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceBudgetManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceBudgetManagerTest.java
index e448ff0b567..c79c47c1238 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceBudgetManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceBudgetManagerTest.java
@@ -20,51 +20,49 @@ package org.apache.flink.runtime.clusterframework.types;
import org.apache.flink.configuration.MemorySize;
-import org.hamcrest.Matchers;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link ResourceBudgetManager}. */
public class ResourceBudgetManagerTest {
@Test
- public void testReserve() {
+ void testReserve() {
ResourceBudgetManager budgetManager =
new ResourceBudgetManager(createResourceProfile(1.0, 100));
- assertThat(budgetManager.reserve(createResourceProfile(0.7, 70)),
Matchers.is(true));
- assertThat(budgetManager.getAvailableBudget(),
is(createResourceProfile(0.3, 30)));
+ assertThat(budgetManager.reserve(createResourceProfile(0.7,
70))).isEqualTo(true);
+
assertThat(budgetManager.getAvailableBudget()).isEqualTo(createResourceProfile(0.3,
30));
}
@Test
- public void testReserveFail() {
+ void testReserveFail() {
ResourceBudgetManager budgetManager =
new ResourceBudgetManager(createResourceProfile(1.0, 100));
- assertThat(budgetManager.reserve(createResourceProfile(1.2, 120)),
Matchers.is(false));
- assertThat(budgetManager.getAvailableBudget(),
is(createResourceProfile(1.0, 100)));
+ assertThat(budgetManager.reserve(createResourceProfile(1.2,
120))).isEqualTo(false);
+
assertThat(budgetManager.getAvailableBudget()).isEqualTo(createResourceProfile(1.0,
100));
}
@Test
- public void testRelease() {
+ void testRelease() {
ResourceBudgetManager budgetManager =
new ResourceBudgetManager(createResourceProfile(1.0, 100));
- assertThat(budgetManager.reserve(createResourceProfile(0.7, 70)),
Matchers.is(true));
- assertThat(budgetManager.release(createResourceProfile(0.5, 50)),
Matchers.is(true));
- assertThat(budgetManager.getAvailableBudget(),
is(createResourceProfile(0.8, 80)));
+ assertThat(budgetManager.reserve(createResourceProfile(0.7,
70))).isEqualTo(true);
+ assertThat(budgetManager.release(createResourceProfile(0.5,
50))).isEqualTo(true);
+
assertThat(budgetManager.getAvailableBudget()).isEqualTo(createResourceProfile(0.8,
80));
}
@Test
- public void testReleaseFail() {
+ void testReleaseFail() {
ResourceBudgetManager budgetManager =
new ResourceBudgetManager(createResourceProfile(1.0, 100));
- assertThat(budgetManager.reserve(createResourceProfile(0.7, 70)),
Matchers.is(true));
- assertThat(budgetManager.release(createResourceProfile(0.8, 80)),
Matchers.is(false));
- assertThat(budgetManager.getAvailableBudget(),
is(createResourceProfile(0.3, 30)));
+ assertThat(budgetManager.reserve(createResourceProfile(0.7,
70))).isEqualTo(true);
+ assertThat(budgetManager.release(createResourceProfile(0.8,
80))).isEqualTo(false);
+
assertThat(budgetManager.getAvailableBudget()).isEqualTo(createResourceProfile(0.3,
30));
}
private static ResourceProfile createResourceProfile(double cpus, int
memory) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
index e760f2e2008..ca2a10d97e4 100755
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
@@ -23,35 +23,25 @@ import org.apache.flink.api.common.resources.CPUResource;
import org.apache.flink.api.common.resources.ExternalResource;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.util.TestLogger;
-import org.hamcrest.Matcher;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
import static
org.apache.flink.runtime.clusterframework.types.ResourceProfile.MAX_CPU_CORE_NUMBER_TO_LOG;
import static
org.apache.flink.runtime.clusterframework.types.ResourceProfile.MAX_MEMORY_SIZE_TO_LOG;
-import static org.hamcrest.CoreMatchers.allOf;
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.not;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
/** Tests for the {@link ResourceProfile}. */
-public class ResourceProfileTest extends TestLogger {
+class ResourceProfileTest {
private static final MemorySize TOO_LARGE_MEMORY =
MAX_MEMORY_SIZE_TO_LOG.add(MemorySize.ofMebiBytes(10));
private static final String EXTERNAL_RESOURCE_NAME = "gpu";
@Test
- public void testAllFieldsNoLessThanProfile() {
+ void testAllFieldsNoLessThanProfile() {
final ResourceProfile rp1 =
ResourceProfile.newBuilder()
.setCpuCores(1.0)
@@ -81,19 +71,19 @@ public class ResourceProfileTest extends TestLogger {
.setManagedMemoryMB(200)
.build();
- assertFalse(rp1.allFieldsNoLessThan(rp2));
- assertTrue(rp2.allFieldsNoLessThan(rp1));
+ assertThat(rp1.allFieldsNoLessThan(rp2)).isFalse();
+ assertThat(rp2.allFieldsNoLessThan(rp1)).isTrue();
- assertFalse(rp1.allFieldsNoLessThan(rp3));
- assertTrue(rp3.allFieldsNoLessThan(rp1));
+ assertThat(rp1.allFieldsNoLessThan(rp3)).isFalse();
+ assertThat(rp3.allFieldsNoLessThan(rp1)).isTrue();
- assertFalse(rp2.allFieldsNoLessThan(rp3));
- assertFalse(rp3.allFieldsNoLessThan(rp2));
+ assertThat(rp2.allFieldsNoLessThan(rp3)).isFalse();
+ assertThat(rp3.allFieldsNoLessThan(rp2)).isFalse();
- assertTrue(rp4.allFieldsNoLessThan(rp1));
- assertTrue(rp4.allFieldsNoLessThan(rp2));
- assertTrue(rp4.allFieldsNoLessThan(rp3));
- assertTrue(rp4.allFieldsNoLessThan(rp4));
+ assertThat(rp4.allFieldsNoLessThan(rp1)).isTrue();
+ assertThat(rp4.allFieldsNoLessThan(rp2)).isTrue();
+ assertThat(rp4.allFieldsNoLessThan(rp3)).isTrue();
+ assertThat(rp4.allFieldsNoLessThan(rp4)).isTrue();
final ResourceProfile rp5 =
ResourceProfile.newBuilder()
@@ -103,7 +93,7 @@ public class ResourceProfileTest extends TestLogger {
.setManagedMemoryMB(100)
.setNetworkMemoryMB(100)
.build();
- assertFalse(rp4.allFieldsNoLessThan(rp5));
+ assertThat(rp4.allFieldsNoLessThan(rp5)).isFalse();
ResourceSpec rs1 =
ResourceSpec.newBuilder(1.0, 100)
@@ -114,22 +104,24 @@ public class ResourceProfileTest extends TestLogger {
.setExtendedResource(new
ExternalResource(EXTERNAL_RESOURCE_NAME, 1.1))
.build();
-
assertFalse(rp1.allFieldsNoLessThan(ResourceProfile.fromResourceSpec(rs1)));
- assertTrue(
- ResourceProfile.fromResourceSpec(rs1)
-
.allFieldsNoLessThan(ResourceProfile.fromResourceSpec(rs2)));
- assertFalse(
- ResourceProfile.fromResourceSpec(rs2)
-
.allFieldsNoLessThan(ResourceProfile.fromResourceSpec(rs1)));
+
assertThat(rp1.allFieldsNoLessThan(ResourceProfile.fromResourceSpec(rs1))).isFalse();
+ assertThat(
+ ResourceProfile.fromResourceSpec(rs1)
+
.allFieldsNoLessThan(ResourceProfile.fromResourceSpec(rs2)))
+ .isTrue();
+ assertThat(
+ ResourceProfile.fromResourceSpec(rs2)
+
.allFieldsNoLessThan(ResourceProfile.fromResourceSpec(rs1)))
+ .isFalse();
}
@Test
- public void testUnknownNoLessThanUnknown() {
-
assertTrue(ResourceProfile.UNKNOWN.allFieldsNoLessThan(ResourceProfile.UNKNOWN));
+ void testUnknownNoLessThanUnknown() {
+
assertThat(ResourceProfile.UNKNOWN.allFieldsNoLessThan(ResourceProfile.UNKNOWN)).isTrue();
}
@Test
- public void testMatchRequirement() {
+ void testMatchRequirement() {
final ResourceProfile resource1 =
ResourceProfile.newBuilder()
.setCpuCores(1.0)
@@ -162,20 +154,21 @@ public class ResourceProfileTest extends TestLogger {
.setExtendedResource(new
ExternalResource(EXTERNAL_RESOURCE_NAME, 1.0))
.build();
- assertTrue(resource1.isMatching(requirement1));
- assertTrue(resource1.isMatching(requirement2));
- assertFalse(resource1.isMatching(requirement3));
+ assertThat(resource1.isMatching(requirement1)).isTrue();
+ assertThat(resource1.isMatching(requirement2)).isTrue();
+ assertThat(resource1.isMatching(requirement3)).isFalse();
- assertTrue(resource2.isMatching(requirement1));
- assertFalse(resource2.isMatching(requirement2));
- assertTrue(resource2.isMatching(requirement3));
+ assertThat(resource2.isMatching(requirement1)).isTrue();
+ assertThat(resource2.isMatching(requirement2)).isFalse();
+ assertThat(resource2.isMatching(requirement3)).isTrue();
}
@Test
- public void testEquals() {
+ void testEquals() {
ResourceSpec rs1 = ResourceSpec.newBuilder(1.0, 100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder(1.0, 100).build();
- assertEquals(ResourceProfile.fromResourceSpec(rs1),
ResourceProfile.fromResourceSpec(rs2));
+ assertThat(ResourceProfile.fromResourceSpec(rs2))
+ .isEqualTo(ResourceProfile.fromResourceSpec(rs1));
ResourceSpec rs3 =
ResourceSpec.newBuilder(1.0, 100)
@@ -185,17 +178,16 @@ public class ResourceProfileTest extends TestLogger {
ResourceSpec.newBuilder(1.0, 100)
.setExtendedResource(new
ExternalResource(EXTERNAL_RESOURCE_NAME, 1.1))
.build();
- assertNotEquals(
- ResourceProfile.fromResourceSpec(rs3),
ResourceProfile.fromResourceSpec(rs4));
+ assertThat(ResourceProfile.fromResourceSpec(rs4))
+ .isNotEqualTo(ResourceProfile.fromResourceSpec(rs3));
ResourceSpec rs5 =
ResourceSpec.newBuilder(1.0, 100)
.setExtendedResource(new
ExternalResource(EXTERNAL_RESOURCE_NAME, 2.2))
.build();
MemorySize networkMemory = MemorySize.ofMebiBytes(100);
- assertEquals(
- ResourceProfile.fromResourceSpec(rs3, networkMemory),
- ResourceProfile.fromResourceSpec(rs5, networkMemory));
+ assertThat(ResourceProfile.fromResourceSpec(rs5, networkMemory))
+ .isEqualTo(ResourceProfile.fromResourceSpec(rs3,
networkMemory));
final ResourceProfile rp1 =
ResourceProfile.newBuilder()
@@ -262,33 +254,32 @@ public class ResourceProfileTest extends TestLogger {
.setNetworkMemoryMB(100)
.build();
- assertNotEquals(rp1, rp2);
- assertNotEquals(rp1, rp3);
- assertNotEquals(rp1, rp4);
- assertNotEquals(rp1, rp5);
- assertNotEquals(rp1, rp6);
- assertNotEquals(rp1, rp7);
- assertEquals(rp1, rp8);
+ assertThat(rp2).isNotEqualTo(rp1);
+ assertThat(rp3).isNotEqualTo(rp1);
+ assertThat(rp4).isNotEqualTo(rp1);
+ assertThat(rp5).isNotEqualTo(rp1);
+ assertThat(rp6).isNotEqualTo(rp1);
+ assertThat(rp7).isNotEqualTo(rp1);
+ assertThat(rp8).isEqualTo(rp1);
}
@Test
- public void testGet() {
+ void testGet() {
ResourceSpec rs =
ResourceSpec.newBuilder(1.0, 100)
.setExtendedResource(new
ExternalResource(EXTERNAL_RESOURCE_NAME, 1.6))
.build();
ResourceProfile rp = ResourceProfile.fromResourceSpec(rs,
MemorySize.ofMebiBytes(50));
- assertEquals(new CPUResource(1.0), rp.getCpuCores());
- assertEquals(150, rp.getTotalMemory().getMebiBytes());
- assertEquals(100, rp.getOperatorsMemory().getMebiBytes());
- assertEquals(
- new ExternalResource(EXTERNAL_RESOURCE_NAME, 1.6),
- rp.getExtendedResources().get(EXTERNAL_RESOURCE_NAME));
+ assertThat(rp.getCpuCores()).isEqualTo(new CPUResource(1.0));
+ assertThat(rp.getTotalMemory().getMebiBytes()).isEqualTo(150);
+ assertThat(rp.getOperatorsMemory().getMebiBytes()).isEqualTo(100);
+ assertThat(rp.getExtendedResources().get(EXTERNAL_RESOURCE_NAME))
+ .isEqualTo(new ExternalResource(EXTERNAL_RESOURCE_NAME, 1.6));
}
@Test
- public void testMerge() {
+ void testMerge() {
final ResourceProfile rp1 =
ResourceProfile.newBuilder()
.setCpuCores(1.0)
@@ -334,22 +325,22 @@ public class ResourceProfileTest extends TestLogger {
.setExtendedResource(new
ExternalResource(EXTERNAL_RESOURCE_NAME, 4.0))
.build();
- assertEquals(rp1MergeRp1, rp1.merge(rp1));
- assertEquals(rp1MergeRp2, rp1.merge(rp2));
- assertEquals(rp1MergeRp2, rp2.merge(rp1));
- assertEquals(rp2MergeRp2, rp2.merge(rp2));
+ assertThat(rp1.merge(rp1)).isEqualTo(rp1MergeRp1);
+ assertThat(rp1.merge(rp2)).isEqualTo(rp1MergeRp2);
+ assertThat(rp2.merge(rp1)).isEqualTo(rp1MergeRp2);
+ assertThat(rp2.merge(rp2)).isEqualTo(rp2MergeRp2);
- assertEquals(ResourceProfile.UNKNOWN,
rp1.merge(ResourceProfile.UNKNOWN));
- assertEquals(ResourceProfile.UNKNOWN,
ResourceProfile.UNKNOWN.merge(rp1));
- assertEquals(
- ResourceProfile.UNKNOWN,
ResourceProfile.UNKNOWN.merge(ResourceProfile.UNKNOWN));
- assertEquals(ResourceProfile.ANY, rp1.merge(ResourceProfile.ANY));
- assertEquals(ResourceProfile.ANY, ResourceProfile.ANY.merge(rp1));
- assertEquals(ResourceProfile.ANY,
ResourceProfile.ANY.merge(ResourceProfile.ANY));
+
assertThat(rp1.merge(ResourceProfile.UNKNOWN)).isEqualTo(ResourceProfile.UNKNOWN);
+
assertThat(ResourceProfile.UNKNOWN.merge(rp1)).isEqualTo(ResourceProfile.UNKNOWN);
+ assertThat(ResourceProfile.UNKNOWN.merge(ResourceProfile.UNKNOWN))
+ .isEqualTo(ResourceProfile.UNKNOWN);
+
assertThat(rp1.merge(ResourceProfile.ANY)).isEqualTo(ResourceProfile.ANY);
+
assertThat(ResourceProfile.ANY.merge(rp1)).isEqualTo(ResourceProfile.ANY);
+
assertThat(ResourceProfile.ANY.merge(ResourceProfile.ANY)).isEqualTo(ResourceProfile.ANY);
}
@Test
- public void testMergeWithOverflow() {
+ void testMergeWithOverflow() {
final CPUResource largeDouble = new CPUResource(Double.MAX_VALUE -
1.0);
final MemorySize largeMemory =
MemorySize.MAX_VALUE.subtract(MemorySize.parse("100m"));
@@ -386,11 +377,11 @@ public class ResourceProfileTest extends TestLogger {
} catch (ArithmeticException e) {
exceptions.add(e);
}
- assertEquals(3, exceptions.size());
+ assertThat(exceptions).hasSize(3);
}
@Test
- public void testSubtract() {
+ void testSubtract() {
final ResourceProfile rp1 =
ResourceProfile.newBuilder()
.setCpuCores(1.0)
@@ -416,28 +407,26 @@ public class ResourceProfileTest extends TestLogger {
.setNetworkMemoryMB(300)
.build();
- assertEquals(rp1, rp3.subtract(rp2));
- assertEquals(rp1, rp2.subtract(rp1));
+ assertThat(rp3.subtract(rp2)).isEqualTo(rp1);
+ assertThat(rp2.subtract(rp1)).isEqualTo(rp1);
- try {
- rp1.subtract(rp2);
- fail("The subtract should failed due to trying to subtract a
larger resource");
- } catch (IllegalArgumentException ex) {
- // Ignore ex.
- }
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .as("The subtract should failed due to trying to subtract a
larger resource")
+ .isThrownBy(() -> rp1.subtract(rp2));
- assertEquals(ResourceProfile.ANY, ResourceProfile.ANY.subtract(rp3));
- assertEquals(ResourceProfile.ANY,
ResourceProfile.ANY.subtract(ResourceProfile.ANY));
- assertEquals(ResourceProfile.ANY, rp3.subtract(ResourceProfile.ANY));
+
assertThat(ResourceProfile.ANY.subtract(rp3)).isEqualTo(ResourceProfile.ANY);
+ assertThat(ResourceProfile.ANY.subtract(ResourceProfile.ANY))
+ .isEqualTo(ResourceProfile.ANY);
+
assertThat(rp3.subtract(ResourceProfile.ANY)).isEqualTo(ResourceProfile.ANY);
- assertEquals(ResourceProfile.UNKNOWN,
ResourceProfile.UNKNOWN.subtract(rp3));
- assertEquals(ResourceProfile.UNKNOWN,
rp3.subtract(ResourceProfile.UNKNOWN));
- assertEquals(
- ResourceProfile.UNKNOWN,
ResourceProfile.UNKNOWN.subtract(ResourceProfile.UNKNOWN));
+
assertThat(ResourceProfile.UNKNOWN.subtract(rp3)).isEqualTo(ResourceProfile.UNKNOWN);
+
assertThat(rp3.subtract(ResourceProfile.UNKNOWN)).isEqualTo(ResourceProfile.UNKNOWN);
+ assertThat(ResourceProfile.UNKNOWN.subtract(ResourceProfile.UNKNOWN))
+ .isEqualTo(ResourceProfile.UNKNOWN);
}
- @Test(expected = IllegalArgumentException.class)
- public void testSubtractWithInfValues() {
+ @Test
+ void testSubtractWithInfValues() {
// Does not equals to ANY since it has extended resources.
final ResourceProfile rp1 =
ResourceProfile.newBuilder()
@@ -457,11 +446,12 @@ public class ResourceProfileTest extends TestLogger {
.setNetworkMemoryMB(200)
.build();
- rp2.subtract(rp1);
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(() -> rp2.subtract(rp1));
}
@Test
- public void testMultiply() {
+ void testMultiply() {
final int by = 3;
final ResourceProfile rp1 =
ResourceProfile.newBuilder()
@@ -478,11 +468,11 @@ public class ResourceProfileTest extends TestLogger {
rp2 = rp2.merge(rp1);
}
- assertEquals(rp2, rp1.multiply(by));
+ assertThat(rp1.multiply(by)).isEqualTo(rp2);
}
@Test
- public void testMultiplyZero() {
+ void testMultiplyZero() {
final ResourceProfile rp1 =
ResourceProfile.newBuilder()
.setCpuCores(1.0)
@@ -493,10 +483,10 @@ public class ResourceProfileTest extends TestLogger {
.setExtendedResource(new
ExternalResource(EXTERNAL_RESOURCE_NAME, 1.0))
.build();
- assertEquals(ResourceProfile.ZERO, rp1.multiply(0));
+ assertThat(rp1.multiply(0)).isEqualTo(ResourceProfile.ZERO);
}
- @Test(expected = IllegalArgumentException.class)
+ @Test
public void testMultiplyNegative() {
final ResourceProfile rp =
ResourceProfile.newBuilder()
@@ -507,73 +497,66 @@ public class ResourceProfileTest extends TestLogger {
.setManagedMemoryMB(100)
.setExtendedResource(new
ExternalResource(EXTERNAL_RESOURCE_NAME, 1.0))
.build();
- rp.multiply(-2);
+
+
assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy(() ->
rp.multiply(-2));
}
@Test
- public void testFromSpecWithSerializationCopy() throws Exception {
+ void testFromSpecWithSerializationCopy() throws Exception {
final ResourceSpec copiedSpec =
CommonTestUtils.createCopySerializable(ResourceSpec.UNKNOWN);
final ResourceProfile profile =
ResourceProfile.fromResourceSpec(copiedSpec);
- assertEquals(ResourceProfile.fromResourceSpec(ResourceSpec.UNKNOWN),
profile);
+
assertThat(profile).isEqualTo(ResourceProfile.fromResourceSpec(ResourceSpec.UNKNOWN));
}
@Test
- public void testSingletonPropertyOfUnknown() throws Exception {
+ void testSingletonPropertyOfUnknown() throws Exception {
final ResourceProfile copiedProfile =
CommonTestUtils.createCopySerializable(ResourceProfile.UNKNOWN);
- assertSame(ResourceProfile.UNKNOWN, copiedProfile);
+ assertThat(copiedProfile).isSameAs(ResourceProfile.UNKNOWN);
}
@Test
- public void testSingletonPropertyOfAny() throws Exception {
+ void testSingletonPropertyOfAny() throws Exception {
final ResourceProfile copiedProfile =
CommonTestUtils.createCopySerializable(ResourceProfile.ANY);
- assertSame(ResourceProfile.ANY, copiedProfile);
+ assertThat(copiedProfile).isSameAs(ResourceProfile.ANY);
}
@Test
- public void doesNotIncludeCPUAndMemoryInToStringIfTheyAreTooLarge() {
+ void doesNotIncludeCPUAndMemoryInToStringIfTheyAreTooLarge() {
double tooLargeCpuCount = MAX_CPU_CORE_NUMBER_TO_LOG.doubleValue() +
1.0;
ResourceProfile resourceProfile =
createResourceProfile(tooLargeCpuCount, TOO_LARGE_MEMORY);
- assertThat(
- resourceProfile.toString(),
- allOf(not(containsCPUCores()), not(containsTaskHeapMemory())));
+ assertThat(resourceProfile.toString())
+ .doesNotContain("cpuCores=")
+ .doesNotContain("taskHeapMemory=");
}
@Test
- public void includesCPUAndMemoryInToStringIfTheyAreBelowThreshold() {
+ void includesCPUAndMemoryInToStringIfTheyAreBelowThreshold() {
ResourceProfile resourceProfile = createResourceProfile(1.0,
MemorySize.ofMebiBytes(4));
- assertThat(resourceProfile.toString(), allOf(containsCPUCores(),
containsTaskHeapMemory()));
+
assertThat(resourceProfile.toString()).contains("cpuCores=").contains("taskHeapMemory=");
}
@Test
- public void testZeroExtendedResourceFromConstructor() {
+ void testZeroExtendedResourceFromConstructor() {
final ResourceProfile resourceProfile =
ResourceProfile.newBuilder()
.setExtendedResource(new
ExternalResource(EXTERNAL_RESOURCE_NAME, 0.0))
.build();
- assertEquals(resourceProfile.getExtendedResources().size(), 0);
+ assertThat(resourceProfile.getExtendedResources()).isEmpty();
}
@Test
- public void testZeroExtendedResourceFromSubtract() {
+ void testZeroExtendedResourceFromSubtract() {
final ResourceProfile resourceProfile =
ResourceProfile.newBuilder()
.setExtendedResource(new
ExternalResource(EXTERNAL_RESOURCE_NAME, 1.0))
.build();
-
assertEquals(resourceProfile.subtract(resourceProfile).getExtendedResources().size(),
0);
- }
-
- private Matcher<String> containsTaskHeapMemory() {
- return containsString("taskHeapMemory=");
- }
-
- private Matcher<String> containsCPUCores() {
- return containsString("cpuCores=");
+
assertThat(resourceProfile.subtract(resourceProfile).getExtendedResources()).isEmpty();
}
private static ResourceProfile createResourceProfile(double cpu,
MemorySize taskHeapMemory) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
index 651275afb70..38b52c29daa 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
@@ -24,11 +24,12 @@ import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtilsTestBase;
import
org.apache.flink.runtime.util.config.memory.jobmanager.JobManagerFlinkMemoryUtils;
-import org.apache.flink.testutils.logging.TestLoggerResource;
+import org.apache.flink.testutils.logging.LoggerAuditingExtension;
-import org.hamcrest.MatcherAssert;
-import org.junit.Rule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import java.util.HashMap;
@@ -37,23 +38,21 @@ import java.util.function.Consumer;
import static
org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.JM_LEGACY_HEAP_OPTIONS;
import static
org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.JM_PROCESS_MEMORY_OPTIONS;
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.hasItem;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.collection.IsArrayWithSize.arrayWithSize;
-import static org.hamcrest.collection.IsMapContaining.hasKey;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
/** Tests for {@link JobManagerProcessUtils}. */
-public class JobManagerProcessUtilsTest extends
ProcessMemoryUtilsTestBase<JobManagerProcessSpec> {
+class JobManagerProcessUtilsTest extends
ProcessMemoryUtilsTestBase<JobManagerProcessSpec> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(JobManagerProcessUtilsTest.class);
+
private static final MemorySize JVM_HEAP_SIZE = MemorySize.parse("1152m");
private static final MemorySize TOTAL_FLINK_MEM_SIZE =
MemorySize.parse("1280m");
private static final MemorySize TOTAL_PROCESS_MEM_SIZE =
MemorySize.parse("1536m");
- @Rule
- public final TestLoggerResource testLoggerResource =
- new TestLoggerResource(JobManagerFlinkMemoryUtils.class,
Level.INFO);
+ @RegisterExtension
+ private final LoggerAuditingExtension testLoggerResource =
+ new LoggerAuditingExtension(JobManagerFlinkMemoryUtils.class,
Level.INFO);
public JobManagerProcessUtilsTest() {
super(
@@ -63,7 +62,7 @@ public class JobManagerProcessUtilsTest extends
ProcessMemoryUtilsTestBase<JobMa
}
@Test
- public void testGenerateDynamicConfigurations() {
+ void testGenerateDynamicConfigurations() {
Configuration config = new Configuration();
config.set(JobManagerOptions.JVM_HEAP_MEMORY, MemorySize.parse("1m"));
config.set(JobManagerOptions.OFF_HEAP_MEMORY, MemorySize.parse("2m"));
@@ -78,21 +77,16 @@ public class JobManagerProcessUtilsTest extends
ProcessMemoryUtilsTestBase<JobMa
Map<String, String> configs =
parseAndAssertJobManagerResourceDynamicConfig(dynamicConfigsStr);
- assertThat(
-
MemorySize.parse(configs.get(JobManagerOptions.JVM_HEAP_MEMORY.key())),
- is(jobManagerProcessSpec.getJvmHeapMemorySize()));
- assertThat(
-
MemorySize.parse(configs.get(JobManagerOptions.OFF_HEAP_MEMORY.key())),
- is(jobManagerProcessSpec.getJvmDirectMemorySize()));
- assertThat(
-
MemorySize.parse(configs.get(JobManagerOptions.JVM_METASPACE.key())),
- is(jobManagerProcessSpec.getJvmMetaspaceSize()));
- assertThat(
-
MemorySize.parse(configs.get(JobManagerOptions.JVM_OVERHEAD_MIN.key())),
- is(jobManagerProcessSpec.getJvmOverheadSize()));
- assertThat(
-
MemorySize.parse(configs.get(JobManagerOptions.JVM_OVERHEAD_MAX.key())),
- is(jobManagerProcessSpec.getJvmOverheadSize()));
+
assertThat(MemorySize.parse(configs.get(JobManagerOptions.JVM_HEAP_MEMORY.key())))
+ .isEqualTo(jobManagerProcessSpec.getJvmHeapMemorySize());
+
assertThat(MemorySize.parse(configs.get(JobManagerOptions.OFF_HEAP_MEMORY.key())))
+ .isEqualTo(jobManagerProcessSpec.getJvmDirectMemorySize());
+
assertThat(MemorySize.parse(configs.get(JobManagerOptions.JVM_METASPACE.key())))
+ .isEqualTo(jobManagerProcessSpec.getJvmMetaspaceSize());
+
assertThat(MemorySize.parse(configs.get(JobManagerOptions.JVM_OVERHEAD_MIN.key())))
+ .isEqualTo(jobManagerProcessSpec.getJvmOverheadSize());
+
assertThat(MemorySize.parse(configs.get(JobManagerOptions.JVM_OVERHEAD_MAX.key())))
+ .isEqualTo(jobManagerProcessSpec.getJvmOverheadSize());
}
private static Map<String, String>
parseAndAssertJobManagerResourceDynamicConfig(
@@ -100,29 +94,29 @@ public class JobManagerProcessUtilsTest extends
ProcessMemoryUtilsTestBase<JobMa
Map<String, String> config = new HashMap<>();
String[] dynamicParameterTokens = dynamicParameterStr.split(" ");
- assertThat(dynamicParameterTokens.length % 2, is(0));
+ assertThat(dynamicParameterTokens.length % 2).isZero();
for (int i = 0; i < dynamicParameterTokens.length; ++i) {
String configStr = dynamicParameterTokens[i];
if (i % 2 == 0) {
- assertThat(configStr, is("-D"));
+ assertThat(configStr).isEqualTo("-D");
} else {
String[] configEntry = configStr.split("=");
- assertThat(configEntry, arrayWithSize(2));
+ assertThat(configEntry).hasSize(2);
config.put(configEntry[0], configEntry[1]);
}
}
- assertThat(config, hasKey(JobManagerOptions.JVM_HEAP_MEMORY.key()));
- assertThat(config, hasKey(JobManagerOptions.OFF_HEAP_MEMORY.key()));
- assertThat(config, hasKey(JobManagerOptions.JVM_METASPACE.key()));
- assertThat(config, hasKey(JobManagerOptions.JVM_OVERHEAD_MIN.key()));
- assertThat(config, hasKey(JobManagerOptions.JVM_OVERHEAD_MAX.key()));
+
assertThat(config).containsKey(JobManagerOptions.JVM_HEAP_MEMORY.key());
+
assertThat(config).containsKey(JobManagerOptions.OFF_HEAP_MEMORY.key());
+ assertThat(config).containsKey(JobManagerOptions.JVM_METASPACE.key());
+
assertThat(config).containsKey(JobManagerOptions.JVM_OVERHEAD_MIN.key());
+
assertThat(config).containsKey(JobManagerOptions.JVM_OVERHEAD_MAX.key());
return config;
}
@Test
- public void testConfigJvmHeapMemory() {
+ void testConfigJvmHeapMemory() {
MemorySize jvmHeapSize = MemorySize.parse("50m");
Configuration conf = new Configuration();
@@ -130,30 +124,31 @@ public class JobManagerProcessUtilsTest extends
ProcessMemoryUtilsTestBase<JobMa
JobManagerProcessSpec jobManagerProcessSpec =
JobManagerProcessUtils.processSpecFromConfig(conf);
- assertThat(jobManagerProcessSpec.getJvmHeapMemorySize(),
is(jvmHeapSize));
+
assertThat(jobManagerProcessSpec.getJvmHeapMemorySize()).isEqualTo(jvmHeapSize);
}
@Test
- public void testLogFailureOfJvmHeapSizeMinSizeVerification() {
+ void testLogFailureOfJvmHeapSizeMinSizeVerification() {
MemorySize jvmHeapMemory = MemorySize.parse("50m");
Configuration conf = new Configuration();
conf.set(JobManagerOptions.JVM_HEAP_MEMORY, jvmHeapMemory);
JobManagerProcessUtils.processSpecFromConfig(conf);
- MatcherAssert.assertThat(
- testLoggerResource.getMessages(),
- hasItem(
- containsString(
- String.format(
- "The configured or derived JVM heap
memory size (%s) is less than its recommended minimum value (%s)",
- jvmHeapMemory.toHumanReadableString(),
- JobManagerOptions.MIN_JVM_HEAP_SIZE
- .toHumanReadableString()))));
+
+ assertThat(testLoggerResource.getMessages())
+ .anyMatch(
+ str ->
+ str.contains(
+ String.format(
+ "The configured or derived JVM
heap memory size (%s) is less than its recommended minimum value (%s)",
+
jvmHeapMemory.toHumanReadableString(),
+
JobManagerOptions.MIN_JVM_HEAP_SIZE
+
.toHumanReadableString())));
}
@Test
- public void testConfigOffHeapMemory() {
+ void testConfigOffHeapMemory() {
MemorySize offHeapMemory = MemorySize.parse("100m");
Configuration conf = new Configuration();
@@ -162,12 +157,12 @@ public class JobManagerProcessUtilsTest extends
ProcessMemoryUtilsTestBase<JobMa
validateInAllConfigurationsWithoutExplicitTotalFlinkAndJvmHeapMem(
conf,
jobManagerProcessSpec ->
- assertThat(
-
jobManagerProcessSpec.getJvmDirectMemorySize(), is(offHeapMemory)));
+
assertThat(jobManagerProcessSpec.getJvmDirectMemorySize())
+ .isEqualTo(offHeapMemory));
}
@Test
- public void testFlinkInternalMemorySizeAddUpFailure() {
+ void testFlinkInternalMemorySizeAddUpFailure() {
MemorySize totalFlinkMemory = MemorySize.parse("199m");
MemorySize jvmHeap = MemorySize.parse("100m");
MemorySize offHeapMemory = MemorySize.parse("100m");
@@ -181,7 +176,7 @@ public class JobManagerProcessUtilsTest extends
ProcessMemoryUtilsTestBase<JobMa
}
@Test
- public void testJvmHeapExceedsTotalFlinkMemoryFailure() {
+ void testJvmHeapExceedsTotalFlinkMemoryFailure() {
MemorySize totalFlinkMemory = MemorySize.ofMebiBytes(100);
MemorySize jvmHeap = MemorySize.ofMebiBytes(150);
@@ -193,7 +188,7 @@ public class JobManagerProcessUtilsTest extends
ProcessMemoryUtilsTestBase<JobMa
}
@Test
- public void testOffHeapMemoryDerivedFromJvmHeapAndTotalFlinkMemory() {
+ void testOffHeapMemoryDerivedFromJvmHeapAndTotalFlinkMemory() {
MemorySize jvmHeap = MemorySize.ofMebiBytes(150);
MemorySize defaultOffHeap =
JobManagerOptions.OFF_HEAP_MEMORY.defaultValue();
MemorySize expectedOffHeap =
MemorySize.ofMebiBytes(100).add(defaultOffHeap);
@@ -205,22 +200,22 @@ public class JobManagerProcessUtilsTest extends
ProcessMemoryUtilsTestBase<JobMa
JobManagerProcessSpec jobManagerProcessSpec =
JobManagerProcessUtils.processSpecFromConfig(conf);
- assertThat(jobManagerProcessSpec.getJvmDirectMemorySize(),
is(expectedOffHeap));
- MatcherAssert.assertThat(
- testLoggerResource.getMessages(),
- hasItem(
- containsString(
- String.format(
- "The Off-Heap Memory size (%s) is
derived the configured Total Flink Memory size (%s) minus "
- + "the configured JVM Heap
Memory size (%s). The default Off-Heap Memory size (%s) is ignored.",
-
expectedOffHeap.toHumanReadableString(),
-
totalFlinkMemory.toHumanReadableString(),
- jvmHeap.toHumanReadableString(),
-
defaultOffHeap.toHumanReadableString()))));
+
assertThat(jobManagerProcessSpec.getJvmDirectMemorySize()).isEqualTo(expectedOffHeap);
+ assertThat(testLoggerResource.getMessages())
+ .anyMatch(
+ str ->
+ str.contains(
+ String.format(
+ "The Off-Heap Memory size (%s)
is derived the configured Total Flink Memory size (%s) minus "
+ + "the configured JVM
Heap Memory size (%s). The default Off-Heap Memory size (%s) is ignored.",
+
expectedOffHeap.toHumanReadableString(),
+
totalFlinkMemory.toHumanReadableString(),
+
jvmHeap.toHumanReadableString(),
+
defaultOffHeap.toHumanReadableString())));
}
@Test
- public void testDeriveFromRequiredFineGrainedOptions() {
+ void testDeriveFromRequiredFineGrainedOptions() {
MemorySize jvmHeap = MemorySize.ofMebiBytes(150);
MemorySize offHeap = MemorySize.ofMebiBytes(50);
MemorySize totalFlinkMemory = MemorySize.ofMebiBytes(200);
@@ -233,7 +228,7 @@ public class JobManagerProcessUtilsTest extends
ProcessMemoryUtilsTestBase<JobMa
JobManagerProcessSpec jobManagerProcessSpec =
JobManagerProcessUtils.processSpecFromConfig(conf);
- assertThat(jobManagerProcessSpec.getJvmDirectMemorySize(),
is(expectedOffHeap));
+
assertThat(jobManagerProcessSpec.getJvmDirectMemorySize()).isEqualTo(expectedOffHeap);
}
@Override
@@ -274,17 +269,17 @@ public class JobManagerProcessUtilsTest extends
ProcessMemoryUtilsTestBase<JobMa
private void validateInConfigWithExplicitJvmHeap(
Configuration customConfig, Consumer<JobManagerProcessSpec>
validateFunc) {
- log.info("Validating in configuration with explicit jvm heap.");
+ LOG.info("Validating in configuration with explicit jvm heap.");
Configuration config = configWithExplicitJvmHeap();
config.addAll(customConfig);
JobManagerProcessSpec jobManagerProcessSpec =
JobManagerProcessUtils.processSpecFromConfig(config);
- assertThat(jobManagerProcessSpec.getJvmHeapMemorySize(),
is(JVM_HEAP_SIZE));
+
assertThat(jobManagerProcessSpec.getJvmHeapMemorySize()).isEqualTo(JVM_HEAP_SIZE);
validateFunc.accept(jobManagerProcessSpec);
}
private void validateFailInConfigWithExplicitJvmHeap(Configuration
customConfig) {
- log.info("Validating failing in configuration with explicit jvm
heap.");
+ LOG.info("Validating failing in configuration with explicit jvm
heap.");
Configuration config = configWithExplicitJvmHeap();
config.addAll(customConfig);
validateFail(config);
@@ -292,17 +287,17 @@ public class JobManagerProcessUtilsTest extends
ProcessMemoryUtilsTestBase<JobMa
private void validateInConfigWithExplicitTotalFlinkMem(
Configuration customConfig, Consumer<JobManagerProcessSpec>
validateFunc) {
- log.info("Validating in configuration with explicit total flink memory
size.");
+ LOG.info("Validating in configuration with explicit total flink memory
size.");
Configuration config = configWithExplicitTotalFlinkMem();
config.addAll(customConfig);
JobManagerProcessSpec jobManagerProcessSpec =
JobManagerProcessUtils.processSpecFromConfig(config);
- assertThat(jobManagerProcessSpec.getTotalFlinkMemorySize(),
is(TOTAL_FLINK_MEM_SIZE));
+
assertThat(jobManagerProcessSpec.getTotalFlinkMemorySize()).isEqualTo(TOTAL_FLINK_MEM_SIZE);
validateFunc.accept(jobManagerProcessSpec);
}
private void validateFailInConfigWithExplicitTotalFlinkMem(Configuration
customConfig) {
- log.info("Validating failing in configuration with explicit total
flink memory size.");
+ LOG.info("Validating failing in configuration with explicit total
flink memory size.");
Configuration config = configWithExplicitTotalFlinkMem();
config.addAll(customConfig);
validateFail(config);
@@ -310,19 +305,19 @@ public class JobManagerProcessUtilsTest extends
ProcessMemoryUtilsTestBase<JobMa
private void validateInConfigWithExplicitTotalFlinkAndJvmHeapMem(
Configuration customConfig, Consumer<JobManagerProcessSpec>
validateFunc) {
- log.info("Validating in configuration with explicit total flink and
jvm heap memory size.");
+ LOG.info("Validating in configuration with explicit total flink and
jvm heap memory size.");
Configuration config = configWithExplicitTotalFlinkAndJvmHeapMem();
config.addAll(customConfig);
JobManagerProcessSpec jobManagerProcessSpec =
JobManagerProcessUtils.processSpecFromConfig(config);
- assertThat(jobManagerProcessSpec.getTotalFlinkMemorySize(),
is(TOTAL_FLINK_MEM_SIZE));
- assertThat(jobManagerProcessSpec.getJvmHeapMemorySize(),
is(JVM_HEAP_SIZE));
+
assertThat(jobManagerProcessSpec.getTotalFlinkMemorySize()).isEqualTo(TOTAL_FLINK_MEM_SIZE);
+
assertThat(jobManagerProcessSpec.getJvmHeapMemorySize()).isEqualTo(JVM_HEAP_SIZE);
validateFunc.accept(jobManagerProcessSpec);
}
private void validateFailInConfigWithExplicitTotalFlinkAndJvmHeapMem(
Configuration customConfig) {
- log.info(
+ LOG.info(
"Validating failing in configuration with explicit total flink
and jvm heap memory size.");
Configuration config = configWithExplicitTotalFlinkAndJvmHeapMem();
config.addAll(customConfig);
@@ -331,17 +326,18 @@ public class JobManagerProcessUtilsTest extends
ProcessMemoryUtilsTestBase<JobMa
private void validateInConfigWithExplicitTotalProcessMem(
Configuration customConfig, Consumer<JobManagerProcessSpec>
validateFunc) {
- log.info("Validating in configuration with explicit total process
memory size.");
+ LOG.info("Validating in configuration with explicit total process
memory size.");
Configuration config = configWithExplicitTotalProcessMem();
config.addAll(customConfig);
JobManagerProcessSpec jobManagerProcessSpec =
JobManagerProcessUtils.processSpecFromConfig(config);
- assertThat(jobManagerProcessSpec.getTotalProcessMemorySize(),
is(TOTAL_PROCESS_MEM_SIZE));
+ assertThat(jobManagerProcessSpec.getTotalProcessMemorySize())
+ .isEqualTo(TOTAL_PROCESS_MEM_SIZE);
validateFunc.accept(jobManagerProcessSpec);
}
private void validateFailInConfigWithExplicitTotalProcessMem(Configuration
customConfig) {
- log.info("Validating failing in configuration with explicit total
process memory size.");
+ LOG.info("Validating failing in configuration with explicit total
process memory size.");
Configuration config = configWithExplicitTotalProcessMem();
config.addAll(customConfig);
validateFail(config);
@@ -349,12 +345,9 @@ public class JobManagerProcessUtilsTest extends
ProcessMemoryUtilsTestBase<JobMa
@Override
protected void validateFail(Configuration config) {
- try {
- JobManagerProcessUtils.processSpecFromConfig(config);
- fail("Configuration did not fail as expected.");
- } catch (IllegalConfigurationException e) {
- // expected
- }
+ assertThatExceptionOfType(IllegalConfigurationException.class)
+ .as("Configuration did not fail as expected.")
+ .isThrownBy(() ->
JobManagerProcessUtils.processSpecFromConfig(config));
}
@Override