This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1d80b12834b2f11a64d61ef489ea0c9e38357524
Author: Xintong Song <tonysong...@gmail.com>
AuthorDate: Mon Oct 21 16:28:57 2019 +0800

    [FLINK-13986][test] Fix test cases missing explicit task executor resource 
configurations.
    
    FLIP-49 requires either one of the following three size(s) to be explicitly 
configured.
    - Task Heap Memory and Managed Memory
    - Total Flink Memory
    - Total Process Memory
    
    This commit fix test cases that fail due to all the above three are 
missing, by setting a reasonable Total Flink Memory size.
---
 .../src/main/java/org/apache/flink/client/LocalExecutor.java     | 2 ++
 .../apache/flink/streaming/connectors/kafka/KafkaTestBase.java   | 3 ++-
 .../runtime/state/TaskExecutorLocalStateStoresManagerTest.java   | 4 +++-
 .../flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java | 9 ++++++---
 .../apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java | 7 +++++--
 .../test/recovery/JobManagerHAProcessFailureRecoveryITCase.java  | 3 ++-
 6 files changed, 20 insertions(+), 8 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java 
b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index b1d3330..7ff020c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.JobExecutorService;
 import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -64,6 +65,7 @@ public class LocalExecutor extends PlanExecutor {
                if (!configuration.contains(RestOptions.BIND_PORT)) {
                        configuration.setString(RestOptions.BIND_PORT, "0");
                }
+               
TaskExecutorResourceUtils.adjustMemoryConfigurationForLocalExecution(configuration);
 
                int numTaskManagers = configuration.getInteger(
                                ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 94d9133..85f8f2f 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -117,7 +117,8 @@ public abstract class KafkaTestBase extends TestLogger {
 
        protected static Configuration getFlinkConfiguration() {
                Configuration flinkConfig = new Configuration();
-               
flinkConfig.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "16m");
+               flinkConfig.setString(TaskManagerOptions.TASK_HEAP_MEMORY, 
"16m");
+               flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
"16m");
                flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
JMXReporter.class.getName());
                return flinkConfig;
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
index d9eac20..6191d49 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Executors;
@@ -46,6 +47,7 @@ public class TaskExecutorLocalStateStoresManagerTest extends 
TestLogger {
        public static TemporaryFolder temporaryFolder = new TemporaryFolder();
 
        private static final long MEM_SIZE_PARAM = 128L * 1024 * 1024;
+       private static final int TOTAL_FLINK_MEMORY_MB = 512;
 
        /**
         * This tests that the creation of {@link TaskManagerServices} 
correctly creates the local state root directory
@@ -205,7 +207,7 @@ public class TaskExecutorLocalStateStoresManagerTest 
extends TestLogger {
        private TaskManagerServicesConfiguration 
createTaskManagerServiceConfiguration(
                        Configuration config) throws IOException {
                return TaskManagerServicesConfiguration.fromConfiguration(
-                       config,
+                       
TaskExecutorResourceUtils.adjustMemoryConfigurationForLocalExecution(config),
                        ResourceID.generate(),
                        InetAddress.getLocalHost(),
                        MEM_SIZE_PARAM,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
index 4f77721..8bc60a0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
@@ -46,6 +46,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.ServerSocket;
 
+import static 
org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils.adjustMemoryConfigurationForLocalExecution;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -57,6 +58,8 @@ public class TaskManagerRunnerStartupTest extends TestLogger {
 
        private static final String LOCAL_HOST = "localhost";
 
+       private static final int TOTAL_FLINK_MEMORY_MB = 512;
+
        @Rule
        public final TemporaryFolder tempFolder = new TemporaryFolder();
 
@@ -87,7 +90,7 @@ public class TaskManagerRunnerStartupTest extends TestLogger {
                        nonWritable.setWritable(false, false));
 
                try {
-                       Configuration cfg = new Configuration();
+                       Configuration cfg = 
adjustMemoryConfigurationForLocalExecution(new Configuration());
                        cfg.setString(CoreOptions.TMP_DIRS, 
nonWritable.getAbsolutePath());
 
                        try {
@@ -117,7 +120,7 @@ public class TaskManagerRunnerStartupTest extends 
TestLogger {
         */
        @Test
        public void testMemoryConfigWrong() throws Exception {
-               Configuration cfg = new Configuration();
+               Configuration cfg = 
adjustMemoryConfigurationForLocalExecution(new Configuration());
 
                // something invalid
                cfg.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, 
"-42m");
@@ -142,7 +145,7 @@ public class TaskManagerRunnerStartupTest extends 
TestLogger {
                final ServerSocket blocker = new ServerSocket(0, 50, 
InetAddress.getByName(LOCAL_HOST));
 
                try {
-                       final Configuration cfg = new Configuration();
+                       final Configuration cfg = 
adjustMemoryConfigurationForLocalExecution(new Configuration());
                        
cfg.setInteger(NettyShuffleEnvironmentOptions.DATA_PORT, 
blocker.getLocalPort());
 
                        startTaskManager(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
index 4110e5f..9777186 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.testutils.SystemExitTrackingSecurityManager;
 import org.apache.flink.util.TestLogger;
@@ -91,8 +92,10 @@ public class TaskManagerRunnerTest extends TestLogger {
        }
 
        private static TaskManagerRunner createTaskManagerRunner(final 
Configuration configuration) throws Exception {
-               TaskManagerRunner taskManagerRunner = new 
TaskManagerRunner(configuration, ResourceID.generate());
+               TaskManagerRunner taskManagerRunner = new TaskManagerRunner(
+                       
TaskExecutorResourceUtils.adjustMemoryConfigurationForLocalExecution(configuration),
+                       ResourceID.generate());
                taskManagerRunner.start();
                return taskManagerRunner;
        }
-}
\ No newline at end of file
+}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
index 130d38e..9ef2d82 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
@@ -249,7 +249,8 @@ public class JobManagerHAProcessFailureRecoveryITCase 
extends TestLogger {
                Configuration config = 
ZooKeeperTestUtils.createZooKeeperHAConfig(
                        zooKeeper.getConnectString(), 
zookeeperStoragePath.getPath());
                // Task manager configuration
-               config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, 
"4m");
+               config.setString(TaskManagerOptions.TASK_HEAP_MEMORY, "1m");
+               config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
                
config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 100);
                config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
 

Reply via email to