brumi1024 commented on code in PR #6734:
URL: https://github.com/apache/hadoop/pull/6734#discussion_r1572661619


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/AbstractCGroupsHandler.java:
##########
@@ -0,0 +1,567 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements. See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership. The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package 
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public abstract class AbstractCGroupsHandler implements CGroupsHandler {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AbstractCGroupsHandler.class);
+  protected static final String MTAB_FILE = "/proc/mounts";
+
+  private final long deleteCGroupTimeout;
+  private final long deleteCGroupDelay;
+  private final Clock clock;
+
+  protected final String mtabFile;
+  protected final CGroupsMountConfig cGroupsMountConfig;
+  protected final ReadWriteLock rwLock;
+  protected Map<CGroupController, String> controllerPaths;
+  protected Map<String, Set<String>> parsedMtab;
+  protected final PrivilegedOperationExecutor privilegedOperationExecutor;
+  protected final String cGroupPrefix;
+
+  /**
+   * Create cgroup handler object.
+   *
+   * @param conf                        configuration
+   * @param privilegedOperationExecutor provides mechanisms to execute
+   *                                    PrivilegedContainerOperations
+   * @param mtab                        mount file location
+   * @throws ResourceHandlerException if initialization failed
+   */
+  AbstractCGroupsHandler(Configuration conf, PrivilegedOperationExecutor
+      privilegedOperationExecutor, String mtab)
+      throws ResourceHandlerException {
+    // Remove leading and trialing slash(es)
+    this.cGroupPrefix = conf.get(YarnConfiguration.
+            NM_LINUX_CONTAINER_CGROUPS_HIERARCHY, "/hadoop-yarn")
+        .replaceAll("^/+", "").replaceAll("/+$", "");
+    this.cGroupsMountConfig = new CGroupsMountConfig(conf);
+    this.deleteCGroupTimeout = conf.getLong(
+        YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_DELETE_TIMEOUT,
+        YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_DELETE_TIMEOUT) +
+        conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
+            YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) + 1000;
+    this.deleteCGroupDelay =
+        conf.getLong(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_DELETE_DELAY,
+            YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_DELETE_DELAY);
+    this.controllerPaths = new HashMap<>();
+    this.parsedMtab = new HashMap<>();
+    this.rwLock = new ReentrantReadWriteLock();
+    this.privilegedOperationExecutor = privilegedOperationExecutor;
+    this.clock = SystemClock.getInstance();
+    mtabFile = mtab;
+    init();
+  }
+
+  protected void init() throws ResourceHandlerException {
+    initializeControllerPaths();
+  }
+
+  @Override
+  public String getControllerPath(CGroupController controller) {
+    rwLock.readLock().lock();
+    try {
+      return controllerPaths.get(controller);
+    } finally {
+      rwLock.readLock().unlock();
+    }
+  }
+
+  private void initializeControllerPaths() throws ResourceHandlerException {
+    // Cluster admins may have some subsystems mounted in specific locations
+    // We'll attempt to figure out mount points. We do this even if we plan
+    // to mount cgroups into our own tree to control the path permissions or
+    // to mount subsystems that are not mounted previously.
+    // The subsystems for new and existing mount points have to match, and
+    // the same hierarchy will be mounted at each mount point with the same
+    // subsystem set.
+
+    Map<String, Set<String>> newMtab = null;
+    Map<CGroupController, String> cPaths;
+    try {
+      if (this.cGroupsMountConfig.mountDisabledButMountPathDefined()) {
+        newMtab = parsePreConfiguredMountPath();
+      }
+
+      if (newMtab == null) {
+        // parse mtab
+        newMtab = parseMtab(mtabFile);
+      }
+
+      // find cgroup controller paths
+      cPaths = initializeControllerPathsFromMtab(newMtab);
+    } catch (IOException e) {
+      LOG.warn("Failed to initialize controller paths! Exception: " + e);
+      throw new ResourceHandlerException(
+          "Failed to initialize controller paths!");
+    }
+
+    // we want to do a bulk update without the paths changing concurrently
+    rwLock.writeLock().lock();
+    try {
+      controllerPaths = cPaths;
+      parsedMtab = newMtab;
+    } finally {
+      rwLock.writeLock().unlock();
+    }
+  }
+
+  protected abstract Map<String, Set<String>> parsePreConfiguredMountPath() 
throws IOException;
+
+  protected Map<CGroupController, String> initializeControllerPathsFromMtab(
+      Map<String, Set<String>> parsedMtab) throws ResourceHandlerException {
+    Map<CGroupController, String> ret = new HashMap<>();
+
+    for (CGroupController controller : CGroupController.values()) {
+      String subsystemName = controller.getName();
+      String controllerPath = findControllerInMtab(subsystemName, parsedMtab);
+
+      if (controllerPath != null) {
+        ret.put(controller, controllerPath);
+      }
+    }
+    return ret;
+  }
+
+  /* We are looking for entries of the form:
+   * none /cgroup/path/mem cgroup rw,memory 0 0
+   *
+   * Use a simple pattern that splits on the five spaces, and
+   * grabs the 2, 3, and 4th fields.
+   */
+
+  private static final Pattern MTAB_FILE_FORMAT = Pattern.compile(
+      "^[^\\s]+\\s([^\\s]+)\\s([^\\s]+)\\s([^\\s]+)\\s[^\\s]+\\s[^\\s]+$");
+
+  /*
+   * Returns a map: path -> mount options
+   * for mounts with type "cgroup". Cgroup controllers will
+   * appear in the list of options for a path.
+   */
+  protected Map<String, Set<String>> parseMtab(String mtab)
+      throws IOException {
+    Map<String, Set<String>> ret = new HashMap<>();
+    BufferedReader in = null;
+
+    try {
+      FileInputStream fis = new FileInputStream(mtab);
+      in = new BufferedReader(new InputStreamReader(fis, 
StandardCharsets.UTF_8));
+
+      for (String str = in.readLine(); str != null;
+           str = in.readLine()) {
+        Matcher m = MTAB_FILE_FORMAT.matcher(str);
+        boolean mat = m.find();
+        if (mat) {
+          String path = m.group(1);
+          String type = m.group(2);
+          String options = m.group(3);
+
+          Set<String> controllerSet = handleMtabEntry(path, type, options);
+          if (controllerSet != null) {
+            ret.put(path, controllerSet);
+          }
+        }
+      }
+    } catch (IOException e) {
+      if (Shell.LINUX) {
+        throw new IOException("Error while reading " + mtab, e);
+      } else {
+        // Ignore the error, if we are running on an os other than Linux
+        LOG.warn("Error while reading " + mtab, e);
+      }
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, in);
+    }
+
+    return ret;
+  }
+
+  protected abstract Set<String> handleMtabEntry(String path, String type, 
String options) throws IOException;
+
+  /**
+   * Find the hierarchy of the subsystem.
+   * The kernel ensures that a subsystem can only be part of a single 
hierarchy.
+   * The subsystem can be part of multiple mount points, if they belong to the
+   * same hierarchy.
+   *
+   * @param controller subsystem like cpu, cpuset, etc...
+   * @param entries    map of paths to mount options
+   * @return the first mount path that has the requested subsystem
+   */
+  protected String findControllerInMtab(String controller,
+                                        Map<String, Set<String>> entries) {
+    for (Map.Entry<String, Set<String>> e : entries.entrySet()) {
+      if (e.getValue().contains(controller)) {
+        if (new File(e.getKey()).canRead()) {
+          return e.getKey();
+        } else {
+          LOG.warn(String.format(
+              "Skipping inaccessible cgroup mount point %s", e.getKey()));
+        }
+      }
+    }
+
+    return null;
+  }
+
+  protected abstract void mountCGroupController(CGroupController controller) 
throws ResourceHandlerException;
+
+  @Override
+  public String getRelativePathForCGroup(String cGroupId) {
+    return cGroupPrefix + Path.SEPARATOR + cGroupId;
+  }
+
+  @Override
+  public String getPathForCGroup(CGroupController controller, String cGroupId) 
{
+    return getControllerPath(controller) + Path.SEPARATOR + cGroupPrefix
+        + Path.SEPARATOR + cGroupId;
+  }
+
+  @Override
+  public String getPathForCGroupTasks(CGroupController controller,
+                                      String cGroupId) {
+    return getPathForCGroup(controller, cGroupId)
+        + Path.SEPARATOR + CGROUP_PROCS_FILE;
+  }
+
+  @Override
+  public String getPathForCGroupParam(CGroupController controller,
+                                      String cGroupId, String param) {
+    return getPathForCGroup(controller, cGroupId)
+        + Path.SEPARATOR + controller.getName()
+        + "." + param;
+  }
+
+  /**
+   * Mount cgroup or use existing mount point based on configuration.
+   *
+   * @param controller - the controller being initialized
+   * @throws ResourceHandlerException yarn hierarchy cannot be created or
+   *                                  accessed for any reason
+   */
+  @Override
+  public void initializeCGroupController(CGroupController controller) throws
+      ResourceHandlerException {
+    if (this.cGroupsMountConfig.isMountEnabled() &&
+        cGroupsMountConfig.ensureMountPathIsDefined()) {
+      // We have a controller that needs to be mounted
+      mountCGroupController(controller);
+    }
+
+    // We are working with a pre-mounted contoller
+    // Make sure that YARN cgroup hierarchy path exists
+    initializePreMountedCGroupController(controller);
+  }
+
+  /**
+   * This function is called when the administrator opted
+   * to use a pre-mounted cgroup controller.
+   * There are two options.
+   * 1. YARN hierarchy already exists. We verify, whether we have write access
+   * in this case.
+   * 2. YARN hierarchy does not exist, yet. We create it in this case.

Review Comment:
   Added a reference to updateEnabledControllersInHierarchy and extended the 
existing javadoc there. Basically it's in an internal detail that YARN creates 
containerId named sub cgroups once a container is launched, and the 
cgroup.subtree_control file controls the enabled controllers directly below the 
current level (the doc is a bit hazy on this, these are the findings of my 
testing). Hence to avoid the first few failures we should proactively allow the 
current controllers in the level below, where the actual container limit 
happens.
   
   Edited the logic a bit to:
   1. add the necessary + signs to the newly added controllers to enable them 
(on save they will automatically be removed, hence there is no need to check 
them)
   2. do not overwrite the whole file, add only the controllers that are 
missing. Added a test case for this as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to