This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b4697f7e41acfb6b76bb7a67d9d19bc577a023c5
Author: Xintong Song <tonysong...@gmail.com>
AuthorDate: Sat Jul 13 08:30:15 2019 +0800

    [FLINK-13241][yarn/mesos] Fix Yarn/MesosResourceManager setting managed 
memory size into wrong configuration instance.
    
    [FLINK-13241][yarn][test] Update 
YarnResourceManagerTest#testCreateSlotsPerWorker to compute 
tmCalculatedResourceProfile based on the RM altered configuration.
    
    [FLINK-13241][yarn][test] Update YarnConfigurationITCase to verify that TMs 
are started with correct managed memory size.
    
    [FLINK-13241][runtime] Calculating and set managed memory size outside of 
ResourceManager.
    
    [FLINK-13241][rumtime/yarn][test] Move 
YarnResourceManagerTest#testCreateSlotsPerWorker to 
ResourceManagerTest#testCreateWorkerSlotProfiles, and update to verify slot 
profile calculation with determinate managed memory size.
    
    [FLINK-13241][runtime] Move getResourceManagerConfiguration from 
ResourceManagerFactory to ResourceManagerUtil.
    
    This closes #9105.
---
 .../clusterframework/MesosResourceManager.java     |  8 +---
 ...tDispatcherResourceManagerComponentFactory.java |  4 +-
 .../runtime/resourcemanager/ResourceManager.java   | 18 ++-------
 .../flink/runtime/util/ResourceManagerUtil.java    | 46 +++++++++++++++++++++
 .../resourcemanager/ResourceManagerTest.java       | 39 ++++++++++++++----
 .../apache/flink/yarn/YarnConfigurationITCase.java | 12 ++++++
 .../org/apache/flink/yarn/YarnResourceManager.java |  9 +----
 .../apache/flink/yarn/YarnResourceManagerTest.java | 47 ----------------------
 8 files changed, 101 insertions(+), 82 deletions(-)

diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index ad0ed58..0bb8d41 100755
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -42,7 +42,6 @@ import org.apache.flink.mesos.util.MesosArtifactServer;
 import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
-import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -184,8 +183,7 @@ public class MesosResourceManager extends 
ResourceManager<RegisteredMesosWorkerN
                this.mesosServices = Preconditions.checkNotNull(mesosServices);
                this.actorSystem = 
Preconditions.checkNotNull(mesosServices.getLocalActorSystem());
 
-               // copy the config, because we might change it for the 
TaskManagers
-               this.flinkConfig = new 
Configuration(Preconditions.checkNotNull(flinkConfig));
+               this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
                this.mesosConfig = Preconditions.checkNotNull(mesosConfig);
 
                this.artifactServer = 
Preconditions.checkNotNull(mesosServices.getArtifactServer());
@@ -198,9 +196,7 @@ public class MesosResourceManager extends 
ResourceManager<RegisteredMesosWorkerN
                this.workersInLaunch = new HashMap<>(8);
                this.workersBeingReturned = new HashMap<>(8);
 
-               final ContaineredTaskManagerParameters 
containeredTaskManagerParameters = 
taskManagerParameters.containeredParameters();
-               this.slotsPerWorker = 
updateTaskManagerConfigAndCreateWorkerSlotProfiles(
-                       flinkConfig, 
containeredTaskManagerParameters.taskManagerTotalMemoryMB(), 
containeredTaskManagerParameters.numSlots());
+               this.slotsPerWorker = createWorkerSlotProfiles(flinkConfig);
                setFailUnfulfillableRequest(true);
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
index 163b039..8b2d9c6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
@@ -49,6 +49,7 @@ import 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.util.ResourceManagerUtil;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
@@ -167,8 +168,9 @@ public abstract class 
AbstractDispatcherResourceManagerComponentFactory<T extend
                                hostname,
                                
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
 
+                       Configuration resourceManagerConfig = 
ResourceManagerUtil.getResourceManagerConfiguration(configuration);
                        resourceManager = 
resourceManagerFactory.createResourceManager(
-                               configuration,
+                               resourceManagerConfig,
                                ResourceID.generate(),
                                rpcService,
                                highAvailabilityServices,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 3ee321f..1a77a98 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -23,10 +23,10 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
@@ -1203,22 +1203,12 @@ public abstract class ResourceManager<WorkerType 
extends ResourceIDRetrievable>
        //  Helper methods
        // 
------------------------------------------------------------------------
 
-       @VisibleForTesting
-       public static Collection<ResourceProfile> 
updateTaskManagerConfigAndCreateWorkerSlotProfiles(
-                       Configuration config, long totalMemoryMB, int numSlots) 
{
-
-               final long cutoffMB = 
ContaineredTaskManagerParameters.calculateCutoffMB(config, totalMemoryMB);
-               final long processMemoryBytes = (totalMemoryMB - cutoffMB) << 
20; // megabytes to bytes
-               final long managedMemoryBytes = 
TaskManagerServices.getManagedMemoryFromProcessMemory(config, 
processMemoryBytes);
-
-               updateFlinkConfForManagedMemory(config, managedMemoryBytes);
+       public static Collection<ResourceProfile> 
createWorkerSlotProfiles(Configuration config) {
+               final int numSlots = 
config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
+               final long managedMemoryBytes = 
MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getBytes();
 
                final ResourceProfile resourceProfile = 
TaskManagerServices.computeSlotResourceProfile(numSlots, managedMemoryBytes);
                return Collections.nCopies(numSlots, resourceProfile);
        }
-
-       static void updateFlinkConfForManagedMemory(Configuration conf, long 
managedMemorySize) {
-               conf.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
managedMemorySize + "b");
-       }
 }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ResourceManagerUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ResourceManagerUtil.java
new file mode 100644
index 0000000..7a65336
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ResourceManagerUtil.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.TaskManagerOptions;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
+
+/**
+ * Utils for ResourceManager.
+ */
+public class ResourceManagerUtil {
+
+       public static Configuration 
getResourceManagerConfiguration(Configuration flinkConfig) {
+               final int taskManagerMemoryMB = 
ConfigurationUtils.getTaskManagerHeapMemory(flinkConfig).getMebiBytes();
+               final long cutoffMB = 
ContaineredTaskManagerParameters.calculateCutoffMB(flinkConfig, 
taskManagerMemoryMB);
+               final long processMemoryBytes = (taskManagerMemoryMB - 
cutoffMB) << 20; // megabytes to bytes
+               final long managedMemoryBytes = 
TaskManagerServices.getManagedMemoryFromProcessMemory(flinkConfig, 
processMemoryBytes);
+
+               final Configuration resourceManagerConfig = new 
Configuration(flinkConfig);
+               
resourceManagerConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
managedMemoryBytes + "b");
+
+               return resourceManagerConfig;
+       }
+
+       private ResourceManagerUtil() {
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index 95e43fc..a9ce427 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -20,7 +20,11 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.instance.HardwareDescription;
@@ -38,6 +42,7 @@ import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
@@ -46,7 +51,6 @@ import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -60,6 +64,7 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 /**
@@ -151,12 +156,12 @@ public class ResourceManagerTest extends TestLogger {
 
                TaskManagerInfo taskManagerInfo = taskManagerInfoFuture.get();
 
-               Assert.assertEquals(taskManagerId, 
taskManagerInfo.getResourceId());
-               Assert.assertEquals(hardwareDescription, 
taskManagerInfo.getHardwareDescription());
-               Assert.assertEquals(taskExecutorGateway.getAddress(), 
taskManagerInfo.getAddress());
-               Assert.assertEquals(dataPort, taskManagerInfo.getDataPort());
-               Assert.assertEquals(0, taskManagerInfo.getNumberSlots());
-               Assert.assertEquals(0, 
taskManagerInfo.getNumberAvailableSlots());
+               assertEquals(taskManagerId, taskManagerInfo.getResourceId());
+               assertEquals(hardwareDescription, 
taskManagerInfo.getHardwareDescription());
+               assertEquals(taskExecutorGateway.getAddress(), 
taskManagerInfo.getAddress());
+               assertEquals(dataPort, taskManagerInfo.getDataPort());
+               assertEquals(0, taskManagerInfo.getNumberSlots());
+               assertEquals(0, taskManagerInfo.getNumberAvailableSlots());
        }
 
        private void registerTaskExecutor(ResourceManagerGateway 
resourceManagerGateway, ResourceID taskExecutorId, String taskExecutorAddress) 
throws Exception {
@@ -270,4 +275,24 @@ public class ResourceManagerTest extends TestLogger {
 
                return resourceManager;
        }
+
+       /**
+        * Tests that RM and TM create the same slot resource profiles.
+        */
+       @Test
+       public void testCreateWorkerSlotProfiles() {
+               final Configuration config = new Configuration();
+               config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
"100m");
+               config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 5);
+
+               final ResourceProfile rmCalculatedResourceProfile =
+                       
ResourceManager.createWorkerSlotProfiles(config).iterator().next();
+
+               final ResourceProfile tmCalculatedResourceProfile =
+                       TaskManagerServices.computeSlotResourceProfile(
+                               
config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS),
+                               
MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getBytes());
+
+               assertEquals(rmCalculatedResourceProfile, 
tmCalculatedResourceProfile);
+       }
 }
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 86b0052..63ff2b2 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -25,8 +25,10 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.PackagedProgramUtils;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.rest.RestClient;
@@ -37,6 +39,7 @@ import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.ResourceManagerUtil;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -187,6 +190,10 @@ public class YarnConfigurationITCase extends YarnTestBase {
                                        assertThat(
                                                (double) 
taskManagerInfo.getHardwareDescription().getSizeOfJvmHeap() / (double) 
expectedHeadSize,
                                                is(closeTo(1.0, 0.15)));
+
+                                       final int expectedManagedMemoryMB = 
calculateManagedMemorySizeMB(configuration);
+
+                                       assertThat((int) 
(taskManagerInfo.getHardwareDescription().getSizeOfManagedMemory() >> 20), 
is(expectedManagedMemoryMB));
                                } finally {
                                        restClient.shutdown(TIMEOUT);
                                        clusterClient.shutdown();
@@ -208,4 +215,9 @@ public class YarnConfigurationITCase extends YarnTestBase {
                        return taskManagerInfo.getNumberSlots() > 0;
                }
        }
+
+       private static int calculateManagedMemorySizeMB(Configuration 
configuration) {
+               Configuration resourceManagerConfig = 
ResourceManagerUtil.getResourceManagerConfiguration(configuration);
+               return 
MemorySize.parse(resourceManagerConfig.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getMebiBytes();
+       }
 }
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 1bb29cb..e9a1f2d 100755
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -158,7 +158,7 @@ public class YarnResourceManager extends 
ResourceManager<YarnWorkerNode> impleme
                        clusterInformation,
                        fatalErrorHandler,
                        jobManagerMetricGroup);
-               this.flinkConfig  = new Configuration(flinkConfig); // copy, 
because we alter the config
+               this.flinkConfig  = flinkConfig;
                this.yarnConfig = new YarnConfiguration();
                this.env = env;
                this.workerNodeMap = new ConcurrentHashMap<>();
@@ -184,7 +184,7 @@ public class YarnResourceManager extends 
ResourceManager<YarnWorkerNode> impleme
                this.defaultCpus = 
flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots);
                this.resource = 
Resource.newInstance(defaultTaskManagerMemoryMB, defaultCpus);
 
-               this.slotsPerWorker = 
updateTaskManagerConfigAndCreateWorkerSlotProfiles(flinkConfig, 
defaultTaskManagerMemoryMB, numberOfTaskSlots);
+               this.slotsPerWorker = createWorkerSlotProfiles(flinkConfig);
                setFailUnfulfillableRequest(true);
        }
 
@@ -323,11 +323,6 @@ public class YarnResourceManager extends 
ResourceManager<YarnWorkerNode> impleme
                return resource;
        }
 
-       @VisibleForTesting
-       Collection<ResourceProfile> getSlotsPerWorker() {
-               return slotsPerWorker;
-       }
-
        @Override
        public boolean stopWorker(final YarnWorkerNode workerNode) {
                final Container container = workerNode.getContainer();
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 19b2f67..df7d85b 100755
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -21,10 +21,7 @@ package org.apache.flink.yarn;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.ResourceManagerOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -51,7 +48,6 @@ import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
-import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.RunnableWithException;
@@ -484,47 +480,4 @@ public class YarnResourceManagerTest extends TestLogger {
                        });
                }};
        }
-
-       /**
-        * Tests that RM and TM calculate same slot resource profile.
-        */
-       @Test
-       public void testCreateSlotsPerWorker() throws Exception {
-               testCreateSlotsPerWorker(flinkConfig, Resource.newInstance(500, 
100));
-
-               Configuration config1 = new Configuration();
-               config1.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 5);
-               testCreateSlotsPerWorker(config1, Resource.newInstance(1000, 
10));
-
-               Configuration config2 = new Configuration();
-               config2.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
"789m");
-               testCreateSlotsPerWorker(config2,  Resource.newInstance(800, 
50));
-
-               Configuration config3 = new Configuration();
-               config3.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
"300m");
-               config3.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
-               testCreateSlotsPerWorker(config3,  Resource.newInstance(2000, 
60));
-
-               Configuration config4 = new Configuration();
-               
config4.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, 
"10m");
-               
config4.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, 
"10m");
-               config4.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
-               testCreateSlotsPerWorker(config4,  Resource.newInstance(1000, 
1));
-       }
-
-       private void testCreateSlotsPerWorker(Configuration config, Resource 
resource) throws  Exception {
-               new Context(config) {{
-                       runTest(() -> {
-
-                               ResourceProfile rmCalculatedResourceProfile = 
resourceManager.getSlotsPerWorker().iterator().next();
-
-                               ResourceProfile tmCalculatedResourceProfile =
-                                       
TaskManagerServices.computeSlotResourceProfile(
-                                               
config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS),
-                                               
MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE)).getBytes());
-
-                               assertEquals(rmCalculatedResourceProfile, 
tmCalculatedResourceProfile);
-                       });
-               }};
-       }
 }

Reply via email to