This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1d80b12834b2f11a64d61ef489ea0c9e38357524 Author: Xintong Song <tonysong...@gmail.com> AuthorDate: Mon Oct 21 16:28:57 2019 +0800 [FLINK-13986][test] Fix test cases missing explicit task executor resource configurations. FLIP-49 requires either one of the following three size(s) to be explicitly configured. - Task Heap Memory and Managed Memory - Total Flink Memory - Total Process Memory This commit fix test cases that fail due to all the above three are missing, by setting a reasonable Total Flink Memory size. --- .../src/main/java/org/apache/flink/client/LocalExecutor.java | 2 ++ .../apache/flink/streaming/connectors/kafka/KafkaTestBase.java | 3 ++- .../runtime/state/TaskExecutorLocalStateStoresManagerTest.java | 4 +++- .../flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java | 9 ++++++--- .../apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java | 7 +++++-- .../test/recovery/JobManagerHAProcessFailureRecoveryITCase.java | 3 ++- 6 files changed, 20 insertions(+), 8 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java index b1d3330..7ff020c 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java @@ -26,6 +26,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.minicluster.JobExecutorService; import org.apache.flink.runtime.minicluster.MiniCluster; @@ -64,6 +65,7 @@ public class LocalExecutor extends PlanExecutor { if (!configuration.contains(RestOptions.BIND_PORT)) { configuration.setString(RestOptions.BIND_PORT, "0"); } + TaskExecutorResourceUtils.adjustMemoryConfigurationForLocalExecution(configuration); int numTaskManagers = configuration.getInteger( ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java index 94d9133..85f8f2f 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java @@ -117,7 +117,8 @@ public abstract class KafkaTestBase extends TestLogger { protected static Configuration getFlinkConfiguration() { Configuration flinkConfig = new Configuration(); - flinkConfig.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "16m"); + flinkConfig.setString(TaskManagerOptions.TASK_HEAP_MEMORY, "16m"); + flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m"); flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); return flinkConfig; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java index d9eac20..6191d49 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.Executors; @@ -46,6 +47,7 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger { public static TemporaryFolder temporaryFolder = new TemporaryFolder(); private static final long MEM_SIZE_PARAM = 128L * 1024 * 1024; + private static final int TOTAL_FLINK_MEMORY_MB = 512; /** * This tests that the creation of {@link TaskManagerServices} correctly creates the local state root directory @@ -205,7 +207,7 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger { private TaskManagerServicesConfiguration createTaskManagerServiceConfiguration( Configuration config) throws IOException { return TaskManagerServicesConfiguration.fromConfiguration( - config, + TaskExecutorResourceUtils.adjustMemoryConfigurationForLocalExecution(config), ResourceID.generate(), InetAddress.getLocalHost(), MEM_SIZE_PARAM, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java index 4f77721..8bc60a0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java @@ -46,6 +46,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.ServerSocket; +import static org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils.adjustMemoryConfigurationForLocalExecution; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -57,6 +58,8 @@ public class TaskManagerRunnerStartupTest extends TestLogger { private static final String LOCAL_HOST = "localhost"; + private static final int TOTAL_FLINK_MEMORY_MB = 512; + @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); @@ -87,7 +90,7 @@ public class TaskManagerRunnerStartupTest extends TestLogger { nonWritable.setWritable(false, false)); try { - Configuration cfg = new Configuration(); + Configuration cfg = adjustMemoryConfigurationForLocalExecution(new Configuration()); cfg.setString(CoreOptions.TMP_DIRS, nonWritable.getAbsolutePath()); try { @@ -117,7 +120,7 @@ public class TaskManagerRunnerStartupTest extends TestLogger { */ @Test public void testMemoryConfigWrong() throws Exception { - Configuration cfg = new Configuration(); + Configuration cfg = adjustMemoryConfigurationForLocalExecution(new Configuration()); // something invalid cfg.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "-42m"); @@ -142,7 +145,7 @@ public class TaskManagerRunnerStartupTest extends TestLogger { final ServerSocket blocker = new ServerSocket(0, 50, InetAddress.getByName(LOCAL_HOST)); try { - final Configuration cfg = new Configuration(); + final Configuration cfg = adjustMemoryConfigurationForLocalExecution(new Configuration()); cfg.setInteger(NettyShuffleEnvironmentOptions.DATA_PORT, blocker.getLocalPort()); startTaskManager( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java index 4110e5f..9777186 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.testutils.SystemExitTrackingSecurityManager; import org.apache.flink.util.TestLogger; @@ -91,8 +92,10 @@ public class TaskManagerRunnerTest extends TestLogger { } private static TaskManagerRunner createTaskManagerRunner(final Configuration configuration) throws Exception { - TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, ResourceID.generate()); + TaskManagerRunner taskManagerRunner = new TaskManagerRunner( + TaskExecutorResourceUtils.adjustMemoryConfigurationForLocalExecution(configuration), + ResourceID.generate()); taskManagerRunner.start(); return taskManagerRunner; } -} \ No newline at end of file +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java index 130d38e..9ef2d82 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java @@ -249,7 +249,8 @@ public class JobManagerHAProcessFailureRecoveryITCase extends TestLogger { Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig( zooKeeper.getConnectString(), zookeeperStoragePath.getPath()); // Task manager configuration - config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "4m"); + config.setString(TaskManagerOptions.TASK_HEAP_MEMORY, "1m"); + config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m"); config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 100); config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);