This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
commit b4697f7e41acfb6b76bb7a67d9d19bc577a023c5 Author: Xintong Song <tonysong...@gmail.com> AuthorDate: Sat Jul 13 08:30:15 2019 +0800 [FLINK-13241][yarn/mesos] Fix Yarn/MesosResourceManager setting managed memory size into wrong configuration instance. [FLINK-13241][yarn][test] Update YarnResourceManagerTest#testCreateSlotsPerWorker to compute tmCalculatedResourceProfile based on the RM altered configuration. [FLINK-13241][yarn][test] Update YarnConfigurationITCase to verify that TMs are started with correct managed memory size. [FLINK-13241][runtime] Calculating and set managed memory size outside of ResourceManager. [FLINK-13241][rumtime/yarn][test] Move YarnResourceManagerTest#testCreateSlotsPerWorker to ResourceManagerTest#testCreateWorkerSlotProfiles, and update to verify slot profile calculation with determinate managed memory size. [FLINK-13241][runtime] Move getResourceManagerConfiguration from ResourceManagerFactory to ResourceManagerUtil. This closes #9105. --- .../clusterframework/MesosResourceManager.java | 8 +--- ...tDispatcherResourceManagerComponentFactory.java | 4 +- .../runtime/resourcemanager/ResourceManager.java | 18 ++------- .../flink/runtime/util/ResourceManagerUtil.java | 46 +++++++++++++++++++++ .../resourcemanager/ResourceManagerTest.java | 39 ++++++++++++++---- .../apache/flink/yarn/YarnConfigurationITCase.java | 12 ++++++ .../org/apache/flink/yarn/YarnResourceManager.java | 9 +---- .../apache/flink/yarn/YarnResourceManagerTest.java | 47 ---------------------- 8 files changed, 101 insertions(+), 82 deletions(-) diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java index ad0ed58..0bb8d41 100755 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java @@ -42,7 +42,6 @@ import org.apache.flink.mesos.util.MesosArtifactServer; import org.apache.flink.mesos.util.MesosConfiguration; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.ContainerSpecification; -import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.concurrent.FutureUtils; @@ -184,8 +183,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN this.mesosServices = Preconditions.checkNotNull(mesosServices); this.actorSystem = Preconditions.checkNotNull(mesosServices.getLocalActorSystem()); - // copy the config, because we might change it for the TaskManagers - this.flinkConfig = new Configuration(Preconditions.checkNotNull(flinkConfig)); + this.flinkConfig = Preconditions.checkNotNull(flinkConfig); this.mesosConfig = Preconditions.checkNotNull(mesosConfig); this.artifactServer = Preconditions.checkNotNull(mesosServices.getArtifactServer()); @@ -198,9 +196,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN this.workersInLaunch = new HashMap<>(8); this.workersBeingReturned = new HashMap<>(8); - final ContaineredTaskManagerParameters containeredTaskManagerParameters = taskManagerParameters.containeredParameters(); - this.slotsPerWorker = updateTaskManagerConfigAndCreateWorkerSlotProfiles( - flinkConfig, containeredTaskManagerParameters.taskManagerTotalMemoryMB(), containeredTaskManagerParameters.numSlots()); + this.slotsPerWorker = createWorkerSlotProfiles(flinkConfig); setFailUnfulfillableRequest(true); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java index 163b039..8b2d9c6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java @@ -49,6 +49,7 @@ import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl; import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.util.ResourceManagerUtil; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever; @@ -167,8 +168,9 @@ public abstract class AbstractDispatcherResourceManagerComponentFactory<T extend hostname, ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration)); + Configuration resourceManagerConfig = ResourceManagerUtil.getResourceManagerConfiguration(configuration); resourceManager = resourceManagerFactory.createResourceManager( - configuration, + resourceManagerConfig, ResourceID.generate(), rpcService, highAvailabilityServices, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 3ee321f..1a77a98 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -23,10 +23,10 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.blob.TransientBlobKey; import org.apache.flink.runtime.clusterframework.ApplicationStatus; -import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; @@ -1203,22 +1203,12 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> // Helper methods // ------------------------------------------------------------------------ - @VisibleForTesting - public static Collection<ResourceProfile> updateTaskManagerConfigAndCreateWorkerSlotProfiles( - Configuration config, long totalMemoryMB, int numSlots) { - - final long cutoffMB = ContaineredTaskManagerParameters.calculateCutoffMB(config, totalMemoryMB); - final long processMemoryBytes = (totalMemoryMB - cutoffMB) << 20; // megabytes to bytes - final long managedMemoryBytes = TaskManagerServices.getManagedMemoryFromProcessMemory(config, processMemoryBytes); - - updateFlinkConfForManagedMemory(config, managedMemoryBytes); + public static Collection<ResourceProfile> createWorkerSlotProfiles(Configuration config) { + final int numSlots = config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS); + final long managedMemoryBytes = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getBytes(); final ResourceProfile resourceProfile = TaskManagerServices.computeSlotResourceProfile(numSlots, managedMemoryBytes); return Collections.nCopies(numSlots, resourceProfile); } - - static void updateFlinkConfForManagedMemory(Configuration conf, long managedMemorySize) { - conf.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemorySize + "b"); - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ResourceManagerUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ResourceManagerUtil.java new file mode 100644 index 0000000..7a65336 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ResourceManagerUtil.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.taskexecutor.TaskManagerServices; + +/** + * Utils for ResourceManager. + */ +public class ResourceManagerUtil { + + public static Configuration getResourceManagerConfiguration(Configuration flinkConfig) { + final int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(flinkConfig).getMebiBytes(); + final long cutoffMB = ContaineredTaskManagerParameters.calculateCutoffMB(flinkConfig, taskManagerMemoryMB); + final long processMemoryBytes = (taskManagerMemoryMB - cutoffMB) << 20; // megabytes to bytes + final long managedMemoryBytes = TaskManagerServices.getManagedMemoryFromProcessMemory(flinkConfig, processMemoryBytes); + + final Configuration resourceManagerConfig = new Configuration(flinkConfig); + resourceManagerConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemoryBytes + "b"); + + return resourceManagerConfig; + } + + private ResourceManagerUtil() { + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java index 95e43fc..a9ce427 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java @@ -20,7 +20,11 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.instance.HardwareDescription; @@ -38,6 +42,7 @@ import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TaskManagerServices; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.TestingFatalErrorHandler; @@ -46,7 +51,6 @@ import org.apache.flink.util.function.ThrowingConsumer; import org.junit.After; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -60,6 +64,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; /** @@ -151,12 +156,12 @@ public class ResourceManagerTest extends TestLogger { TaskManagerInfo taskManagerInfo = taskManagerInfoFuture.get(); - Assert.assertEquals(taskManagerId, taskManagerInfo.getResourceId()); - Assert.assertEquals(hardwareDescription, taskManagerInfo.getHardwareDescription()); - Assert.assertEquals(taskExecutorGateway.getAddress(), taskManagerInfo.getAddress()); - Assert.assertEquals(dataPort, taskManagerInfo.getDataPort()); - Assert.assertEquals(0, taskManagerInfo.getNumberSlots()); - Assert.assertEquals(0, taskManagerInfo.getNumberAvailableSlots()); + assertEquals(taskManagerId, taskManagerInfo.getResourceId()); + assertEquals(hardwareDescription, taskManagerInfo.getHardwareDescription()); + assertEquals(taskExecutorGateway.getAddress(), taskManagerInfo.getAddress()); + assertEquals(dataPort, taskManagerInfo.getDataPort()); + assertEquals(0, taskManagerInfo.getNumberSlots()); + assertEquals(0, taskManagerInfo.getNumberAvailableSlots()); } private void registerTaskExecutor(ResourceManagerGateway resourceManagerGateway, ResourceID taskExecutorId, String taskExecutorAddress) throws Exception { @@ -270,4 +275,24 @@ public class ResourceManagerTest extends TestLogger { return resourceManager; } + + /** + * Tests that RM and TM create the same slot resource profiles. + */ + @Test + public void testCreateWorkerSlotProfiles() { + final Configuration config = new Configuration(); + config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "100m"); + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 5); + + final ResourceProfile rmCalculatedResourceProfile = + ResourceManager.createWorkerSlotProfiles(config).iterator().next(); + + final ResourceProfile tmCalculatedResourceProfile = + TaskManagerServices.computeSlotResourceProfile( + config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS), + MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getBytes()); + + assertEquals(rmCalculatedResourceProfile, tmCalculatedResourceProfile); + } } diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java index 86b0052..63ff2b2 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java @@ -25,8 +25,10 @@ import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.PackagedProgramUtils; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.rest.RestClient; @@ -37,6 +39,7 @@ import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.util.ResourceManagerUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -187,6 +190,10 @@ public class YarnConfigurationITCase extends YarnTestBase { assertThat( (double) taskManagerInfo.getHardwareDescription().getSizeOfJvmHeap() / (double) expectedHeadSize, is(closeTo(1.0, 0.15))); + + final int expectedManagedMemoryMB = calculateManagedMemorySizeMB(configuration); + + assertThat((int) (taskManagerInfo.getHardwareDescription().getSizeOfManagedMemory() >> 20), is(expectedManagedMemoryMB)); } finally { restClient.shutdown(TIMEOUT); clusterClient.shutdown(); @@ -208,4 +215,9 @@ public class YarnConfigurationITCase extends YarnTestBase { return taskManagerInfo.getNumberSlots() > 0; } } + + private static int calculateManagedMemorySizeMB(Configuration configuration) { + Configuration resourceManagerConfig = ResourceManagerUtil.getResourceManagerConfiguration(configuration); + return MemorySize.parse(resourceManagerConfig.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes(); + } } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index 1bb29cb..e9a1f2d 100755 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -158,7 +158,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme clusterInformation, fatalErrorHandler, jobManagerMetricGroup); - this.flinkConfig = new Configuration(flinkConfig); // copy, because we alter the config + this.flinkConfig = flinkConfig; this.yarnConfig = new YarnConfiguration(); this.env = env; this.workerNodeMap = new ConcurrentHashMap<>(); @@ -184,7 +184,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme this.defaultCpus = flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots); this.resource = Resource.newInstance(defaultTaskManagerMemoryMB, defaultCpus); - this.slotsPerWorker = updateTaskManagerConfigAndCreateWorkerSlotProfiles(flinkConfig, defaultTaskManagerMemoryMB, numberOfTaskSlots); + this.slotsPerWorker = createWorkerSlotProfiles(flinkConfig); setFailUnfulfillableRequest(true); } @@ -323,11 +323,6 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme return resource; } - @VisibleForTesting - Collection<ResourceProfile> getSlotsPerWorker() { - return slotsPerWorker; - } - @Override public boolean stopWorker(final YarnWorkerNode workerNode) { final Container container = workerNode.getContainer(); diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java index 19b2f67..df7d85b 100755 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java @@ -21,10 +21,7 @@ package org.apache.flink.yarn; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.MemorySize; -import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.configuration.ResourceManagerOptions; -import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -51,7 +48,6 @@ import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.SlotStatus; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; -import org.apache.flink.runtime.taskexecutor.TaskManagerServices; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.util.TestLogger; import org.apache.flink.util.function.RunnableWithException; @@ -484,47 +480,4 @@ public class YarnResourceManagerTest extends TestLogger { }); }}; } - - /** - * Tests that RM and TM calculate same slot resource profile. - */ - @Test - public void testCreateSlotsPerWorker() throws Exception { - testCreateSlotsPerWorker(flinkConfig, Resource.newInstance(500, 100)); - - Configuration config1 = new Configuration(); - config1.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 5); - testCreateSlotsPerWorker(config1, Resource.newInstance(1000, 10)); - - Configuration config2 = new Configuration(); - config2.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "789m"); - testCreateSlotsPerWorker(config2, Resource.newInstance(800, 50)); - - Configuration config3 = new Configuration(); - config3.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "300m"); - config3.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true); - testCreateSlotsPerWorker(config3, Resource.newInstance(2000, 60)); - - Configuration config4 = new Configuration(); - config4.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, "10m"); - config4.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, "10m"); - config4.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true); - testCreateSlotsPerWorker(config4, Resource.newInstance(1000, 1)); - } - - private void testCreateSlotsPerWorker(Configuration config, Resource resource) throws Exception { - new Context(config) {{ - runTest(() -> { - - ResourceProfile rmCalculatedResourceProfile = resourceManager.getSlotsPerWorker().iterator().next(); - - ResourceProfile tmCalculatedResourceProfile = - TaskManagerServices.computeSlotResourceProfile( - config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS), - MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getBytes()); - - assertEquals(rmCalculatedResourceProfile, tmCalculatedResourceProfile); - }); - }}; - } }