azagrebin commented on a change in pull request #12516:
URL: https://github.com/apache/flink/pull/12516#discussion_r436707025



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
##########
@@ -42,19 +49,37 @@
        private static final MemorySize TOTAL_FLINK_MEM_SIZE = 
MemorySize.parse("1280m");
        private static final MemorySize TOTAL_PROCESS_MEM_SIZE = 
MemorySize.parse("1536m");
 
+       @Rule
+       public final TestLoggerResource testLoggerResource = new 
TestLoggerResource(JobManagerFlinkMemoryUtils.class, Level.WARN);
+
        public JobManagerProcessUtilsTest() {
                super(JM_PROCESS_MEMORY_OPTIONS, JM_LEGACY_HEAP_OPTIONS, 
JobManagerOptions.TOTAL_PROCESS_MEMORY);
        }
 
        @Test
        public void testConfigJvmHeapMemory() {
-               MemorySize jvmHeapSize = MemorySize.parse("50m");
+               MemorySize jvmHeapMemory = MemorySize.parse("50m");
 
                Configuration conf = new Configuration();
-               conf.set(JobManagerOptions.JVM_HEAP_MEMORY, jvmHeapSize);
+               conf.set(JobManagerOptions.JVM_HEAP_MEMORY, jvmHeapMemory);
+
+               JobManagerProcessSpec jobManagerProcessSpec = 
JobManagerProcessUtils.processSpecFromConfig(conf);
+               assertThat(jobManagerProcessSpec.getJvmHeapMemorySize(), 
is(jvmHeapMemory));
+               MatcherAssert.assertThat(

Review comment:
       I think for readability, logging test should be a separate test case.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java
##########
@@ -91,13 +91,13 @@ private static void verifyJvmHeapSize(MemorySize 
jvmHeapSize) {
        private static void verifyJobStoreCacheSize(Configuration config, 
MemorySize jvmHeapSize) {
                MemorySize jobStoreCacheHeapSize =
                        
MemorySize.parse(config.getLong(JobManagerOptions.JOB_STORE_CACHE_SIZE) + "b");
-               if (jvmHeapSize.compareTo(jobStoreCacheHeapSize) < 1) {
+               if (jvmHeapSize.compareTo(jobStoreCacheHeapSize) < 0) {

Review comment:
       same here, why zero?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
##########
@@ -42,19 +49,37 @@
        private static final MemorySize TOTAL_FLINK_MEM_SIZE = 
MemorySize.parse("1280m");
        private static final MemorySize TOTAL_PROCESS_MEM_SIZE = 
MemorySize.parse("1536m");
 
+       @Rule
+       public final TestLoggerResource testLoggerResource = new 
TestLoggerResource(JobManagerFlinkMemoryUtils.class, Level.WARN);
+
        public JobManagerProcessUtilsTest() {
                super(JM_PROCESS_MEMORY_OPTIONS, JM_LEGACY_HEAP_OPTIONS, 
JobManagerOptions.TOTAL_PROCESS_MEMORY);
        }
 
        @Test
        public void testConfigJvmHeapMemory() {
-               MemorySize jvmHeapSize = MemorySize.parse("50m");
+               MemorySize jvmHeapMemory = MemorySize.parse("50m");
 
                Configuration conf = new Configuration();
-               conf.set(JobManagerOptions.JVM_HEAP_MEMORY, jvmHeapSize);
+               conf.set(JobManagerOptions.JVM_HEAP_MEMORY, jvmHeapMemory);
+
+               JobManagerProcessSpec jobManagerProcessSpec = 
JobManagerProcessUtils.processSpecFromConfig(conf);
+               assertThat(jobManagerProcessSpec.getJvmHeapMemorySize(), 
is(jvmHeapMemory));
+               MatcherAssert.assertThat(
+                       testLoggerResource.getMessages(),
+                       hasItem(containsString("The configured or derived JVM 
heap memory size (50.000mb (52428800 bytes)) is less than its recommended 
minimum value (128 mb)")));

Review comment:
        I think it would be better to also use 
`jvmHeapSize.toHumanReadableString()` and 
`JobManagerOptions.MIN_JVM_HEAP_SIZE.toHumanReadableString()` in test so that 
we do not need to change hard coded values if they change.
   
   Btw, it looks like we forgot to add `toHumanReadableString` for 
`JobManagerOptions.MIN_JVM_HEAP_SIZE` in `verifyJvmHeapSize()`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
##########
@@ -42,19 +49,37 @@
        private static final MemorySize TOTAL_FLINK_MEM_SIZE = 
MemorySize.parse("1280m");
        private static final MemorySize TOTAL_PROCESS_MEM_SIZE = 
MemorySize.parse("1536m");
 
+       @Rule
+       public final TestLoggerResource testLoggerResource = new 
TestLoggerResource(JobManagerFlinkMemoryUtils.class, Level.WARN);
+
        public JobManagerProcessUtilsTest() {
                super(JM_PROCESS_MEMORY_OPTIONS, JM_LEGACY_HEAP_OPTIONS, 
JobManagerOptions.TOTAL_PROCESS_MEMORY);
        }
 
        @Test
        public void testConfigJvmHeapMemory() {
-               MemorySize jvmHeapSize = MemorySize.parse("50m");
+               MemorySize jvmHeapMemory = MemorySize.parse("50m");
 
                Configuration conf = new Configuration();
-               conf.set(JobManagerOptions.JVM_HEAP_MEMORY, jvmHeapSize);
+               conf.set(JobManagerOptions.JVM_HEAP_MEMORY, jvmHeapMemory);
+
+               JobManagerProcessSpec jobManagerProcessSpec = 
JobManagerProcessUtils.processSpecFromConfig(conf);
+               assertThat(jobManagerProcessSpec.getJvmHeapMemorySize(), 
is(jvmHeapMemory));
+               MatcherAssert.assertThat(
+                       testLoggerResource.getMessages(),
+                       hasItem(containsString("The configured or derived JVM 
heap memory size (50.000mb (52428800 bytes)) is less than its recommended 
minimum value (128 mb)")));
+
+               jvmHeapMemory = MemorySize.parse("150m");
+
+               conf.set(JobManagerOptions.JVM_HEAP_MEMORY, jvmHeapMemory);
+               conf.set(JobManagerOptions.JOB_STORE_CACHE_SIZE, 200L * 1024L * 
1024L);
 
-               JobManagerProcessSpec JobManagerProcessSpec = 
JobManagerProcessUtils.processSpecFromConfig(conf);
-               assertThat(JobManagerProcessSpec.getJvmHeapMemorySize(), 
is(jvmHeapSize));
+               jobManagerProcessSpec = 
JobManagerProcessUtils.processSpecFromConfig(conf);
+               assertThat(jobManagerProcessSpec.getJvmHeapMemorySize(), 
is(jvmHeapMemory));
+               MatcherAssert.assertThat(

Review comment:
       Same here, this should be a separate test case.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java
##########
@@ -75,12 +75,12 @@ private static JobManagerFlinkMemory 
createJobManagerFlinkMemory(
                        MemorySize jvmHeap,
                        MemorySize offHeapMemory) {
                verifyJvmHeapSize(jvmHeap);
-               verifyJobStoreCacheSize(config, offHeapMemory);
+               verifyJobStoreCacheSize(config, jvmHeap);
                return new JobManagerFlinkMemory(jvmHeap, offHeapMemory);
        }
 
        private static void verifyJvmHeapSize(MemorySize jvmHeapSize) {
-               if (jvmHeapSize.compareTo(JobManagerOptions.MIN_JVM_HEAP_SIZE) 
< 1) {
+               if (jvmHeapSize.compareTo(JobManagerOptions.MIN_JVM_HEAP_SIZE) 
< 0) {

Review comment:
       why is this changed? I think zero is also ok, meaning that `jvmHeapSize` 
can be exactly the `MIN_JVM_HEAP_SIZE`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java
##########
@@ -75,12 +75,12 @@ private static JobManagerFlinkMemory 
createJobManagerFlinkMemory(
                        MemorySize jvmHeap,
                        MemorySize offHeapMemory) {
                verifyJvmHeapSize(jvmHeap);
-               verifyJobStoreCacheSize(config, offHeapMemory);
+               verifyJobStoreCacheSize(config, jvmHeap);
                return new JobManagerFlinkMemory(jvmHeap, offHeapMemory);
        }
 
        private static void verifyJvmHeapSize(MemorySize jvmHeapSize) {
-               if (jvmHeapSize.compareTo(JobManagerOptions.MIN_JVM_HEAP_SIZE) 
< 1) {
+               if (jvmHeapSize.compareTo(JobManagerOptions.MIN_JVM_HEAP_SIZE) 
< 0) {

Review comment:
       True, sorry for confusion




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to