agresch commented on a change in pull request #3366:
URL: https://github.com/apache/storm/pull/3366#discussion_r675165552



##########
File path: storm-core/src/native/worker-launcher/impl/utils/string-utils.h
##########
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef __FreeBSD__
+#define _WITH_GETLINE
+#endif
+
+#ifndef _UTILS_STRING_UTILS_H_
+#define _UTILS_STRING_UTILS_H_
+
+#include <stdbool.h>
+#include <stddef.h>
+
+typedef struct strbuf_struct {
+  char* buffer;               // points to beginning of the string
+  size_t length;              // strlen of buffer (sans trailing NUL)
+  size_t capacity;            // capacity of the buffer
+} strbuf;
+
+
+/*
+ * Validate if the input is a valid container id
+ * return false/true
+ */
+bool validate_container_id(const char* input);

Review comment:
       I would prefer that for cleanliness.

##########
File path: docs/OCI-support.md
##########
@@ -0,0 +1,1704 @@
+---
+title: OCI/Squashfs Runtime
+layout: documentation
+documentation: true
+---
+
+# OCI/Squashfs Runtime
+
+OCI/Squashfs is a container runtime that allows topologies to run inside 
docker containers. However, unlike the existing
+Docker runtime, the images are fetched from HDFS rather than from the Docker 
registry or requiring images to be pre-loaded
+into Docker on each node. Docker does not need to be installed on the nodes in 
order for this runtime to work.
+
+Note: This has only been tested on RHEL7.
+
+## Motivation
+
+#### Docker runtime drawbacks
+Using the current Docker runtime (see 
[Docker-support.md](Docker-support.md#Docker-Support) ) has some drawbacks:
+
+##### Docker Daemons Dependency
+
+The Docker daemons `dockerd` and `containerd` must be running on the system in 
order for the Docker runtime to function. 
+And these daemons can get out of sync which could cause nontrivial issues to 
the containers.
+
+##### Docker Registry Issues at Scale
+
+Using the Docker runtime on a large scale Storm cluster can overwhelm the 
Docker registry. In practice this requires
+admins to pre-load a Docker image on all the cluster nodes in a controlled 
fashion before a large job requesting 
+the image can run.
+
+##### Image Costs in Time and Space
+
+Docker stores each image layer as a tar.gz archive. In order to use the layer, 
the compressed archive must be unpacked
+into the node's filesystem. This can consume significant disk space, 
especially when the reliable image store location
+capacity is relatively small. In addition, unpacking an image layer takes 
time, especially when the layer is large or 
+contains thousands of files. This additional time for unpacking delays 
container launch beyond the time needed to transfer
+the layer data over the network.
+
+#### OCI/Squashfs Runtime advantages
+
+The OCI/Squashfs runtime avoids the drawback listed above in the following 
ways.
+
+##### No Docker dependencies on The Node
+
+Docker does not need to be installed on each node, nor is there a dependency 
on a daemon or service that needs to be started
+by an admin before containers can be launched. All that is required to be 
present on each node is an OCI-compatible runtime like
+`runc`.
+
+##### Leverages Distributed File Sytems For Scale
+
+Image can be fetched via HDFS or other distributed file systems instead of the 
Docker registry. This prevents a large cluster from
+overwhelming a Docker registry when a big topology causes all of the nodes to 
request an image at once. This also allows large clusters
+to run topologies more dynamically, as images would not need to be pre-loaded 
by admins on each node to prevent a large Docker registry
+image request storm.
+
+##### Smaller, Faster images on The Node
+
+The new runtime handles layer localization directly, so layer formats other 
than tar archive can be supported. For example, each image layer
+can be converted to squashfs images as part of copying the layers to HDFS. 
squashfs is a file system optimized for running directly on a
+compressed image. With squashfs layers the layer data can remain compressed on 
the node saving disk space. Container launch after layer
+localization is also faster, as the layers no longer need to be unpacked into 
a directory to become usable.
+
+
+## Prerequisite 
+
+First you need to use the`docker-to-squash.py` script to download docker 
images and configs, convert layers to squashfs files and put them to a 
directory in HDFS, for example
+
+```bash
+python docker-to-squash.py pull-build-push-update --hdfs-root 
hdfs://hostname:port/containers \
+                      
docker.xxx.com:4443/hadoop-user-images/storm/rhel7:20201202-232133,storm/rhel7:dev_current
 --log DEBUG --bootstrap
+```
+
+With this command, all the layers belong to this image will be converted to 
squashfs file and be placed under `./layers` directory; 
+the manifest of this image will be placed under `./manifests` directory with 
the name as the sha256 value of the manifest content;
+the config of this image will be placed under `./config` directory with the 
name as the sha256 value of the config content;
+the mapping from the image tag to the sha256 value of the manifest  will be 
written to the "./image-tag-to-manifest-file".
+
+##### Example
+
+For example, the directory structure is like this:
+
+```bash
+-bash-4.2$ hdfs dfs -ls /containers/*
+Found 1 items
+-r--r--r--   3 hdfsqa hadoop       7877 2020-12-04 14:29 
/containers/config/ef1ff2c7167a1a6cd01e106f51b84a4d400611ba971c53cbc28de7919515ca4e
+-r--r--r--   3 hdfsqa hadoop        160 2020-12-04 14:30 
/containers/image-tag-to-hash
+Found 7 items
+-r--r--r--   3 hdfsqa hadoop   84697088 2020-12-04 14:28 
/containers/layers/152ee1d2cccea9dfe6393d2bdf9d077b67616b2b417b25eb74fc5ffaadcb96f5.sqsh
+-r--r--r--   3 hdfsqa hadoop  545267712 2020-12-04 14:28 
/containers/layers/18ee671016a1bf3ecab07395d93c2cbecd352d59c497a1551e2074d64e1098d9.sqsh
+-r--r--r--   3 hdfsqa hadoop   12906496 2020-10-06 15:24 
/containers/layers/1b73e9433ecca0a6bb152bd7525f2b7c233484d51c24f8a6ba483d5cfd3035dc.sqsh
+-r--r--r--   3 hdfsqa hadoop       4096 2020-12-04 14:29 
/containers/layers/344224962010c03c9ca1f11a9bff0dfcc296ac46d0a55e4ff30a0ad13b9817af.sqsh
+-r--r--r--   3 hdfsqa hadoop   26091520 2020-10-06 15:22 
/containers/layers/3692c3483ef6516fba685b316448e8aaf0fc10bb66818116edc8e5e6800076c7.sqsh
+-r--r--r--   3 hdfsqa hadoop       4096 2020-12-04 14:29 
/containers/layers/8710a3d72f75b45c48ab6b9b67eb6d77caea3dac91a0c30e0831f591cba4887e.sqsh
+-r--r--r--   3 hdfsqa hadoop  121122816 2020-10-06 15:23 
/containers/layers/ea067172a7138f035d89a5c378db6d66c1581d98b0497b21f256e04c3d2b5303.sqsh
+Found 1 items
+-r--r--r--   3 hdfsqa hadoop       1793 2020-12-04 14:29 
/containers/manifests/26fd443859325d5911f3be5c5e231dddca88ee0d526456c0c92dd794148d8585
+```
+
+The `image-tag-to-manifest-file`:
+```bash
+-bash-4.2$ hdfs dfs -cat /containers/image-tag-to-hash
+storm/rhel7:dev_current:26fd443859325d5911f3be5c5e231dddca88ee0d526456c0c92dd794148d8585#docker.xxx.com:4443/hadoop-user-images/storm/rhel7:20201202-232133
+```
+
+The manifest file 
`26fd443859325d5911f3be5c5e231dddca88ee0d526456c0c92dd794148d8585`:
+```json
+{
+  "schemaVersion": 2,
+  "mediaType": "application/vnd.docker.distribution.manifest.v2+json",
+  "config": {
+    "mediaType": "application/vnd.docker.container.image.v1+json",
+    "size": 7877,
+    "digest": 
"sha256:ef1ff2c7167a1a6cd01e106f51b84a4d400611ba971c53cbc28de7919515ca4e"
+  },
+  "layers": [
+    {
+      "mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
+      "size": 26858854,
+      "digest": 
"sha256:3692c3483ef6516fba685b316448e8aaf0fc10bb66818116edc8e5e6800076c7"
+    },
+    {
+      "mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
+      "size": 123300113,
+      "digest": 
"sha256:ea067172a7138f035d89a5c378db6d66c1581d98b0497b21f256e04c3d2b5303"
+    },
+    {
+      "mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
+      "size": 12927624,
+      "digest": 
"sha256:1b73e9433ecca0a6bb152bd7525f2b7c233484d51c24f8a6ba483d5cfd3035dc"
+    },
+    {
+      "mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
+      "size": 567401434,
+      "digest": 
"sha256:18ee671016a1bf3ecab07395d93c2cbecd352d59c497a1551e2074d64e1098d9"
+    },
+    {
+      "mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
+      "size": 85748864,
+      "digest": 
"sha256:152ee1d2cccea9dfe6393d2bdf9d077b67616b2b417b25eb74fc5ffaadcb96f5"
+    },
+    {
+      "mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
+      "size": 186,
+      "digest": 
"sha256:344224962010c03c9ca1f11a9bff0dfcc296ac46d0a55e4ff30a0ad13b9817af"
+    },
+    {
+      "mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
+      "size": 156,
+      "digest": 
"sha256:8710a3d72f75b45c48ab6b9b67eb6d77caea3dac91a0c30e0831f591cba4887e"
+    }
+  ]
+}
+```
+
+And the config file 
`ef1ff2c7167a1a6cd01e106f51b84a4d400611ba971c53cbc28de7919515ca4e` (some of the 
content is omitted):
+```json
+{
+  "architecture": "amd64",
+  "config": {
+    "Hostname": "",
+    "Domainname": "",
+    "User": "root",
+    "AttachStdin": false,
+    "AttachStdout": false,
+    "AttachStderr": false,
+    "Tty": false,
+    "OpenStdin": false,
+    "StdinOnce": false,
+    "Env": [
+      "X_SCLS=rh-git218",
+      "LD_LIBRARY_PATH=/opt/rh/httpd24/root/usr/lib64",
+      
"PATH=/opt/rh/rh-git218/root/usr/bin:/home/y/bin64:/home/y/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/home/y/share/yjava_jdk/java/bin",
+      "PERL5LIB=/opt/rh/rh-git218/root/usr/share/perl5/vendor_perl",
+      "LANG=en_US.UTF-8",
+      "LANGUAGE=en_US:en",
+      "LC_ALL=en_US.UTF-8",
+      "JAVA_HOME=/home/y/share/yjava_jdk/java"
+    ],
+    "Cmd": [
+      "/bin/bash"
+    ],
+    "Image": 
"sha256:6977cd0735c96d14248e834f775373e40230c134b70f10163c05ce6c6c8873ca",
+    "Volumes": null,
+    "WorkingDir": "",
+    "Entrypoint": null,
+    "OnBuild": null,
+    "Labels": {
+      "name": "xxxxx"
+    }
+  },
+  "container": 
"344ff1084dea3e0501a0d426e52c43cd589d6b29f33ab0915b7be8906b9aec41",
+  "container_config": {
+    "Hostname": "344ff1084dea",
+    "Domainname": "",
+    "User": "root",
+    "AttachStdin": false,
+    "AttachStdout": false,
+    "AttachStderr": false,
+    "Tty": false,
+    "OpenStdin": false,
+    "StdinOnce": false,
+    "Env": [
+      "X_SCLS=rh-git218",
+      "LD_LIBRARY_PATH=/opt/rh/httpd24/root/usr/lib64",
+      
"PATH=/opt/rh/rh-git218/root/usr/bin:/home/y/bin64:/home/y/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/home/y/share/yjava_jdk/java/bin",
+      "PERL5LIB=/opt/rh/rh-git218/root/usr/share/perl5/vendor_perl",
+      "LANG=en_US.UTF-8",
+      "LANGUAGE=en_US:en",
+      "LC_ALL=en_US.UTF-8",
+      "JAVA_HOME=/home/y/share/yjava_jdk/java"
+    ],
+    "Cmd": [
+      "/bin/sh",
+      "-c"
+    ],
+    "Image": 
"sha256:6977cd0735c96d14248e834f775373e40230c134b70f10163c05ce6c6c8873ca",
+    "Volumes": null,
+    "WorkingDir": "",
+    "Entrypoint": null,
+    "OnBuild": null,
+    "Labels": {
+      "name": "xxxxx"
+    }
+  },
+  "created": "2020-12-02T23:25:47.354704574Z",
+  "docker_version": "19.03.8",
+  "history": [
+    {
+      "created": "2020-02-18T21:43:36.934503462Z",
+      "created_by": "/bin/sh"
+    },
+    {
+      "created": "2020-02-18T21:45:05.729764427Z",
+      "created_by": "/bin/sh"
+    },
+    {
+      "created": "2020-02-18T21:46:36.638896031Z",
+      "created_by": "/bin/sh"
+    },
+    {
+      "created": "2020-12-02T23:21:54.595662813Z",
+      "created_by": "/bin/sh -c #(nop)  USER root",
+      "empty_layer": true
+    },
+    {
+      "created": "2020-12-02T23:25:45.822235539Z",
+      "created_by": "/bin/sh -c /opt/python/bin/pip3.6 install --no-cache-dir 
numpy scipy pandas requests setuptools scikit-learn matplotlib"
+    },
+    {
+      "created": "2020-12-02T23:25:46.708884538Z",
+      "created_by": "/bin/sh -c #(nop)  ENV 
JAVA_HOME=/home/y/share/yjava_jdk/java",
+      "empty_layer": true
+    },
+    {
+      "created": "2020-12-02T23:25:46.770226108Z",
+      "created_by": "/bin/sh -c #(nop)  ENV 
PATH=/opt/rh/rh-git218/root/usr/bin:/home/y/bin64:/home/y/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/home/y/share/yjava_jdk/java/bin",
+      "empty_layer": true
+    },
+    {
+      "created": "2020-12-02T23:25:46.837263533Z",
+      "created_by": "/bin/sh -c #(nop) COPY 
file:33283617fbd796b25e53eaf4d26012eea1f610ff9acc0706f11281e86be440dc in 
/etc/krb5.conf "
+    },
+    {
+      "created": "2020-12-02T23:25:47.237515768Z",
+      "created_by": "/bin/sh -c echo '7.7.4' \u003e 
/etc/hadoop-dockerfile-version"
+    }
+  ],
+  "os": "linux",
+  "rootfs": {
+    "type": "layers",
+    "diff_ids": [
+      
"sha256:9f627fdb0292afbe5e2eb96edc1b3a5d3a8f468e3acf1d29f1509509285c7341",
+      
"sha256:83d2667f9458eaf719588a96bb63f2520bd377d29d52f6dbd4ff13c819c08037",
+      
"sha256:fcba5f49eef4f3d77d3e73e499a1a4e1914b3f20d903625d27c0aa3ab82f41a3",
+      
"sha256:3bd4567d0726f5d6560b548bc0c0400e868f6a27067887a36edd7e8ceafff96c",
+      
"sha256:ad56900a1f10e6ef96f17c7e8019384540ab1b34ccce6bda06675473b08d787e",
+      
"sha256:ac0a645609f957ab9c4a8a62f8646e99f09a74ada54ed2eaca204c6e183c9ae8",
+      "sha256:9bf10102fc145156f4081c2cacdbadab5816dce4f88eb02881ab739239d316e6"
+    ]
+  }
+}
+```
+
+Note: To use the `docker-to-squash.py`, you need to install 
[skopeo](https://github.com/containers/skopeo), 
[jq](https://stedolan.github.io/jq/) and squashfs-tools.
+
+
+## Configurations
+
+Then you need to set up storm with the following configs:
+
+| Setting                                   | Description                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
      |
+|-------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `storm.resource.isolation.plugin.enable`  | set to `true` to enable 
isolation plugin. `storm.resource.isolation.plugin` determines which plugin to 
use. If this is set to `false`, 
`org.apache.storm.container.DefaultResourceIsolationManager` will be used.      
                                                                                
                                                                                
                                                                     |
+| `storm.resource.isolation.plugin`         | set to 
`"org.apache.storm.container.oci.RuncLibContainerManager"` to enable OCI/Squash 
runtime support                                                                 
                                                                                
                                                                                
                                                                                
                                                                                
                             |
+| `storm.oci.allowed.images`             | A whitelist of docker images that 
can be used. Users can only choose a docker image from the list.

Review comment:
       can we update this config description to allowlist?

##########
File path: 
storm-server/src/main/java/org/apache/storm/container/oci/RuncLibContainerManager.java
##########
@@ -0,0 +1,631 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.storm.container.oci;
+
+import static org.apache.storm.utils.ConfigUtils.FILE_SEPARATOR;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.Reader;
+import java.io.Writer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.StormTimer;
+import org.apache.storm.container.cgroup.CgroupUtils;
+import org.apache.storm.container.cgroup.core.MemoryCore;
+import org.apache.storm.container.oci.OciContainerExecutorConfig.OciLayer;
+import 
org.apache.storm.container.oci.OciContainerExecutorConfig.OciRuntimeConfig;
+import 
org.apache.storm.container.oci.OciContainerExecutorConfig.OciRuntimeConfig.OciLinuxConfig;
+import 
org.apache.storm.container.oci.OciContainerExecutorConfig.OciRuntimeConfig.OciMount;
+import 
org.apache.storm.container.oci.OciContainerExecutorConfig.OciRuntimeConfig.OciProcessConfig;
+import org.apache.storm.daemon.supervisor.ClientSupervisorUtils;
+import org.apache.storm.daemon.supervisor.ExitCodeCallback;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ReflectionUtils;
+import org.apache.storm.utils.ServerUtils;
+import org.apache.storm.utils.Utils;
+
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.DumperOptions;
+import org.yaml.snakeyaml.Yaml;
+
+public class RuncLibContainerManager extends OciContainerManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RuncLibContainerManager.class);
+
+    private OciImageTagToManifestPluginInterface imageTagToManifestPlugin;
+    private OciManifestToResourcesPluginInterface manifestToResourcesPlugin;
+    private OciResourcesLocalizerInterface ociResourcesLocalizer;
+    private ObjectMapper mapper;
+    private int layersToKeep;
+    private String seccomp;
+
+    private static final String RESOLV_CONF = "/etc/resolv.conf";
+    private static final String HOSTNAME = "/etc/hostname";
+    private static final String HOSTS = "/etc/hosts";
+    private static final String OCI_CONFIG_JSON = "oci-config.json";
+
+    private static final String SQUASHFS_MEDIA_TYPE = 
"application/vnd.squashfs";
+
+    //CPU CFS (Completely Fair Scheduler) period
+    private static final long CPU_CFS_PERIOD_US = 100000;
+
+    private Map<String, Long> workerToContainerPid = new ConcurrentHashMap<>();
+    private Map<String, ExitCodeCallback> workerToExitCallback = new 
ConcurrentHashMap<>();
+    private Map<String, String> workerToUser = new ConcurrentHashMap<>();
+    private StormTimer checkContainerAliveTimer;
+
+    @Override
+    public void prepare(Map<String, Object> conf) throws IOException {
+        super.prepare(conf);
+
+        imageTagToManifestPlugin = chooseImageTagToManifestPlugin();
+        imageTagToManifestPlugin.init(conf);
+
+        manifestToResourcesPlugin = chooseManifestToResourcesPlugin();
+        manifestToResourcesPlugin.init(conf);
+
+        ociResourcesLocalizer = chooseOciResourcesLocalizer();
+        ociResourcesLocalizer.init(conf);
+
+        layersToKeep = ObjectReader.getInt(
+                conf.get(DaemonConfig.STORM_OCI_LAYER_MOUNTS_TO_KEEP),
+                100
+        );
+
+        mapper = new ObjectMapper();
+
+        if (seccompJsonFile != null) {
+            seccomp = new 
String(Files.readAllBytes(Paths.get(seccompJsonFile)));
+        }
+
+        if (checkContainerAliveTimer == null) {
+            checkContainerAliveTimer =
+                new StormTimer("CheckRuncContainerAlive", 
Utils.createDefaultUncaughtExceptionHandler());
+            checkContainerAliveTimer
+                .scheduleRecurring(0, (Integer) 
conf.get(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS), () -> {
+                    try {
+                        checkContainersAlive();
+                    } catch (Exception e) {
+                        //Ignore
+                        LOG.debug("The CheckRuncContainerAlive thread has 
exception. Ignored", e);
+                    }
+                });
+        }
+    }
+
+    private OciImageTagToManifestPluginInterface 
chooseImageTagToManifestPlugin() throws IllegalArgumentException {
+        String pluginName = ObjectReader.getString(
+                conf.get(DaemonConfig.STORM_OCI_IMAGE_TAG_TO_MANIFEST_PLUGIN)
+        );
+        LOG.info("imageTag-to-manifest Plugin is: {}", pluginName);
+        return ReflectionUtils.newInstance(pluginName);
+    }
+
+    private OciManifestToResourcesPluginInterface 
chooseManifestToResourcesPlugin() throws IllegalArgumentException {
+        String pluginName = ObjectReader.getString(
+                conf.get(DaemonConfig.STORM_OCI_MANIFEST_TO_RESOURCES_PLUGIN)
+        );
+        LOG.info("manifest to resource Plugin is: {}", pluginName);
+        return ReflectionUtils.newInstance(pluginName);
+    }
+
+    private OciResourcesLocalizerInterface chooseOciResourcesLocalizer()
+        throws IllegalArgumentException {
+        String pluginName = ObjectReader.getString(
+                conf.get(DaemonConfig.STORM_OCI_RESOURCES_LOCALIZER)
+        );
+        LOG.info("oci resource localizer is: {}", pluginName);
+        return ReflectionUtils.newInstance(pluginName);
+    }
+
+    //the container process ID in the process namespace of the host.
+    private String containerPidFile(String workerId) {
+        return ConfigUtils.workerArtifactsSymlink(conf, workerId) + 
FILE_SEPARATOR + "container-" + workerId + ".pid";
+    }
+
+    @Override
+    public void launchWorkerProcess(String user, String topologyId,  
Map<String, Object> topoConf,
+                                    int port, String workerId,
+                                    List<String> command, Map<String, String> 
env, String logPrefix,
+                                    ExitCodeCallback processExitCallback, File 
targetDir) throws IOException {
+
+        String imageName = getImageName(topoConf);
+        if (imageName == null) {
+            LOG.error("Image name for {} is not configured properly; will not 
continue to launch the worker", topologyId);
+            return;
+        }
+
+        //set container ID to port + worker ID
+        String containerId = getContainerId(workerId, port);
+
+        //get manifest
+        ImageManifest manifest = 
imageTagToManifestPlugin.getManifestFromImageTag(imageName);
+        LOG.debug("workerId {}: Got manifest: {}", workerId, 
manifest.toString());
+
+        //get layers metadata
+        OciResource configResource = 
manifestToResourcesPlugin.getConfigResource(manifest);
+        LOG.info("workerId {}: Got config metadata: {}", workerId, 
configResource.toString());
+
+        saveRuncYaml(topologyId, port, containerId, imageName, configResource);
+
+        List<OciResource> layersResource = 
manifestToResourcesPlugin.getLayerResources(manifest);
+        LOG.info("workerId {}: Got layers metadata: {}", workerId, 
layersResource.toString());
+
+        //localize resource
+        String configLocalPath = 
ociResourcesLocalizer.localize(configResource);
+
+        List<String> ociEnv = new ArrayList<>();
+        List<String> args = new ArrayList<>();
+
+        ArrayList<OciLayer> layers = new ArrayList<>();
+
+        File file = new File(configLocalPath);
+        //extract env
+        List<String> imageEnv = extractImageEnv(file);
+        if (imageEnv != null && !imageEnv.isEmpty()) {
+            ociEnv.addAll(imageEnv);
+        }
+        for (Map.Entry<String, String> entry : env.entrySet()) {
+            ociEnv.add(entry.getKey() + "=" + entry.getValue());
+        }
+        LOG.debug("workerId {}: ociEnv: {}", workerId, ociEnv);
+
+        //extract entrypoint
+        List<String> entrypoint = extractImageEntrypoint(file);
+        if (entrypoint != null && !entrypoint.isEmpty()) {
+            args.addAll(entrypoint);
+        }
+        LOG.debug("workerId {}: args: {}", workerId, args);
+
+        //localize layers
+        List<String> layersLocalPath = 
ociResourcesLocalizer.localize((layersResource));
+        //compose layers
+        for (String layerLocalPath : layersLocalPath) {
+            OciLayer layer = new OciLayer(SQUASHFS_MEDIA_TYPE, layerLocalPath);
+            layers.add(layer);
+        }
+        LOG.debug("workerId {}: layers: {}", workerId, layers);
+        ArrayList<OciMount> mounts = new ArrayList<>();
+        setContainerMounts(mounts, topologyId, workerId, port);
+        LOG.debug("workerId {}: mounts: {}", workerId, mounts);
+
+        //calculate the cpusQuotas based on CPU_CFS_PERIOD and assigned CPU
+        Long cpusQuotas = null;
+        if (workerToCpu.containsKey(workerId)) {
+            cpusQuotas = workerToCpu.get(workerId) * CPU_CFS_PERIOD_US / 100;
+        }
+
+        Long memoryInBytes = null;
+        if (workerToMemoryMb.containsKey(workerId)) {
+            memoryInBytes = workerToMemoryMb.get(workerId) * 1024 * 1024L;
+        }
+        LOG.info("workerId {}: memoryInBytes set to {}; cpusQuotas set to {}", 
workerId, memoryInBytes, cpusQuotas);
+
+        //<workerRoot>/<workerId>
+        String workerDir = targetDir.getAbsolutePath();
+        String workerScriptPath = ServerUtils.writeScript(workerDir, command, 
env, "0027");
+
+        args.add("bash");
+        args.add(workerScriptPath);
+
+        //The container PID (on the host) will be written to this file.
+        String containerPidFilePath = containerPidFile(workerId);
+
+        OciProcessConfig processConfig = createOciProcessConfig(workerDir, 
ociEnv, args);
+
+        OciLinuxConfig linuxConfig =
+            createOciLinuxConfig(cpusQuotas, memoryInBytes, cgroupParent + "/" 
+ containerId, seccomp, workerId);
+
+        OciRuntimeConfig ociRuntimeConfig = new OciRuntimeConfig(null, mounts, 
processConfig, null,
+                                                          null, null, 
linuxConfig);
+
+        OciContainerExecutorConfig ociContainerExecutorConfig =
+            createOciContainerExecutorConfig(user, containerId, 
containerPidFilePath,
+                                             workerScriptPath, null, null, 
null, layers, ociRuntimeConfig);
+
+        //launch the container using worker-launcher
+        String executorConfigToJsonFile = 
writeOciExecutorConfigToJsonFile(mapper, ociContainerExecutorConfig, workerDir);
+        LOG.info("workerId {}: oci-config.json file path: {}", workerId, 
executorConfigToJsonFile);
+
+        List<String> cmdArgs = 
Arrays.asList(CmdType.RUN_OCI_CONTAINER.toString(), workerDir, 
executorConfigToJsonFile,
+                                             
ConfigUtils.workerArtifactsSymlink(conf, workerId));
+
+        // launch the oci container. waiting prevents possible race condition 
that could prevent cleanup of container
+        int exitCode = ClientSupervisorUtils.processLauncherAndWait(conf, 
user, cmdArgs, env, logPrefix, targetDir);
+        if (exitCode != 0) {
+            LOG.error("launchWorkerProcess RuncCommand {} exited with code: 
{}", "LaunchWorker-" + containerId, exitCode);
+            throw new RuntimeException("launchWorkerProcess Failed to create 
Runc Container. ContainerId: " + containerId);
+        }
+
+        //Add to the watched list
+        LOG.debug("Adding {} to the watched workers list", workerId);
+        workerToExitCallback.put(workerId, processExitCallback);
+        workerToUser.put(workerId, user);
+
+    }
+
+    private void checkContainersAlive() {
+        //Check if all watched workers are still alive
+        workerToUser.forEach((workerId, user) -> {
+            if (isContainerDead(workerId, user)) {
+                invokeProcessExitCallback(workerId);
+            }
+        });
+    }
+
+    private boolean isContainerDead(String workerId, String user) {
+        boolean isDead = true;
+        Long pid = getContainerPid(workerId);
+        LOG.debug("Checking container {}, pid {}, user {}", workerId, pid, 
user);
+        //do nothing if pid is null.
+        if (pid != null && user != null) {
+            try {
+                isDead = ServerUtils.areAllProcessesDead(conf, user, workerId, 
Collections.singleton(pid));
+            } catch (IOException e) {
+                //ignore
+                LOG.debug("Error while checking if container is dead.", e);
+            }
+        }
+        return isDead;
+    }
+
+    private void invokeProcessExitCallback(String workerId) {
+        LOG.info("processExitCallback returned for workerId {}", workerId);
+        ExitCodeCallback processExitCallback = 
workerToExitCallback.get(workerId);
+        if (processExitCallback != null) {
+            processExitCallback.call(0);
+        }
+    }
+
+    private String getContainerId(String workerId, int port) throws 
IOException {
+        if (port <= 0) { // when killing workers, we will have the workerId 
and a port of -1
+            return getContainerIdFromOciJson(workerId);
+        }
+        return port + "-" + workerId;
+    }
+
+    private String getContainerIdFromOciJson(String workerId) throws 
IOException {
+        String ociJson = ConfigUtils.workerRoot(conf, workerId) + 
FILE_SEPARATOR + OCI_CONFIG_JSON;
+        LOG.info("port unknown for workerId {}, looking up from {}", workerId, 
ociJson);
+        JSONParser parser = new JSONParser();
+
+        try (Reader reader = new FileReader(ociJson)) {
+            JSONObject jsonObject = (JSONObject) parser.parse(reader);
+            return (String) jsonObject.get("containerId");
+        } catch (ParseException e) {
+            throw new IOException("Unable to parse {}", e);
+        }
+    }
+
+    // save runc.yaml in artifacts dir so we can track which image the worker 
was launched with
+    private void saveRuncYaml(String topologyId, int port, String containerId, 
String imageName, OciResource configResource) {
+        String fname = String.format("runc-%s.yaml", containerId);
+        File file = new File(ConfigUtils.workerArtifactsRoot(conf, topologyId, 
port), fname);
+        DumperOptions options = new DumperOptions();
+        options.setIndent(2);
+        options.setPrettyFlow(true);
+        options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK);
+        Yaml yaml = new Yaml(options);
+        Map<String, Object> data = new HashMap<>();
+        data.put("imageName", imageName);
+        data.put("manifest", configResource.getFileName());
+        data.put("configPath", configResource.getPath());
+        try (Writer writer = new FileWriter(file)) {
+            yaml.dump(data, writer);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private String writeOciExecutorConfigToJsonFile(ObjectMapper mapper, 
OciContainerExecutorConfig ociContainerExecutorConfig,
+                                                    String workerDir) throws 
IOException {
+        File cmdDir = new File(workerDir);
+        if (!cmdDir.exists()) {
+            throw new IOException(workerDir + " doesn't exist");
+        }
+
+        File commandFile = new File(cmdDir + FILE_SEPARATOR + OCI_CONFIG_JSON);
+        mapper.writeValue(commandFile, ociContainerExecutorConfig);
+        return commandFile.getAbsolutePath();
+    }
+
+    private void setContainerMounts(ArrayList<OciMount> mounts, String 
topologyId, String workerId, Integer port) throws IOException {
+        //read-only bindmounts need to be added before read-write bindmounts 
otherwise read-write bindmounts may be overridden.
+        for (String readonlyMount : readonlyBindmounts) {
+            addOciMountLocation(mounts, readonlyMount, readonlyMount, false, 
false);
+        }
+
+        for (String readwriteMount : readwriteBindmounts) {
+            addOciMountLocation(mounts, readwriteMount, readwriteMount, false, 
true);
+        }
+
+        addOciMountLocation(mounts, RESOLV_CONF, RESOLV_CONF, false, false);
+        addOciMountLocation(mounts, HOSTNAME, HOSTNAME, false, false);
+        addOciMountLocation(mounts, HOSTS, HOSTS, false, false);
+        addOciMountLocation(mounts, nscdPath, nscdPath, false, false);
+        addOciMountLocation(mounts, stormHome, stormHome, false, false);
+        addOciMountLocation(mounts, cgroupRootPath, cgroupRootPath, false, 
false);
+
+        //set of locations to be bind mounted
+        String supervisorLocalDir = ConfigUtils.supervisorLocalDir(conf);
+        addOciMountLocation(mounts, supervisorLocalDir, supervisorLocalDir, 
false, false);
+
+        String workerRootDir = ConfigUtils.workerRoot(conf, workerId);
+        addOciMountLocation(mounts, workerRootDir, workerRootDir, false, true);
+
+        String workerArtifactsRoot = ConfigUtils.workerArtifactsRoot(conf, 
topologyId, port);
+        addOciMountLocation(mounts, workerArtifactsRoot, workerArtifactsRoot, 
false, true);
+
+        String workerUserFile = ConfigUtils.workerUserFile(conf, workerId);
+        addOciMountLocation(mounts, workerUserFile, workerUserFile, false, 
true);
+
+        String sharedByTopologyDir = ConfigUtils.sharedByTopologyDir(conf, 
topologyId);
+        addOciMountLocation(mounts, sharedByTopologyDir, sharedByTopologyDir, 
false, true);
+
+        String workerTmpRoot = ConfigUtils.workerTmpRoot(conf, workerId);
+        addOciMountLocation(mounts, workerTmpRoot, TMP_DIR, false, true);
+    }
+
+    private List<String> extractImageEnv(File config) throws IOException {
+        JsonNode node = mapper.readTree(config);
+        JsonNode envNode = node.path("config").path("Env");
+        if (envNode.isMissingNode()) {
+            return null;
+        }
+        return mapper.treeToValue(envNode, List.class);
+    }
+
+    private List<String> extractImageEntrypoint(File config) throws 
IOException {
+        JsonNode node = mapper.readTree(config);
+        JsonNode entrypointNode = node.path("config").path("Entrypoint");
+        if (entrypointNode.isMissingNode()) {
+            return null;
+        }
+        return mapper.treeToValue(entrypointNode, List.class);
+    }
+
+    private OciContainerExecutorConfig createOciContainerExecutorConfig(
+            String username, String containerId, String pidFile,
+            String containerScriptPath, String containerCredentialsPath,
+            List<String> localDirs, List<String> logDirs,
+            List<OciLayer> layers, OciRuntimeConfig ociRuntimeConfig) {
+
+        return new OciContainerExecutorConfig(username, containerId,
+                pidFile, containerScriptPath, containerCredentialsPath,
+                localDirs, logDirs, layers, layersToKeep, ociRuntimeConfig);
+    }
+
+    private OciProcessConfig createOciProcessConfig(String cwd,
+                                                    List<String> env, 
List<String> args) {
+        return new OciProcessConfig(false, null, cwd, env,
+                args, null, null, null, true, 0, null, null);
+    }
+
+    private OciLinuxConfig createOciLinuxConfig(Long cpusQuotas, Long 
memInBytes,
+                                                String cgroupsPath, String 
seccomp, String workerId) {
+        OciLinuxConfig.Resources.Cpu cgroupCpu = null;
+
+        if (cpusQuotas != null) {
+            cgroupCpu = new OciLinuxConfig.Resources.Cpu(0, cpusQuotas, 
CPU_CFS_PERIOD_US, 0, 0,
+                    null, null);
+
+            if (workerToCores.containsKey(workerId)) {
+                
cgroupCpu.setCpus(StringUtils.join(workerToCores.get(workerId), ","));
+                cgroupCpu.setMems(workerToMemoryZone.get(workerId));
+            }
+        }
+
+        OciLinuxConfig.Resources.Memory cgroupMem = null;
+        if (memInBytes != null) {
+            cgroupMem = new OciLinuxConfig.Resources.Memory(memInBytes, 0, 0, 
0, 0, 0, false);
+        }
+
+        OciLinuxConfig.Resources cgroupResources =
+                new OciLinuxConfig.Resources(null, cgroupMem, cgroupCpu, null, 
null, null,
+                        null, null);
+
+        return new OciLinuxConfig(null, null, null, null,
+                cgroupsPath, cgroupResources, null, null, seccomp, null, null,
+                null, null);
+    }
+
+    private void addOciMountLocation(List<OciMount> mounts, String srcPath,
+                                     String dstPath, boolean createSource, 
boolean isReadWrite) throws IOException {
+        if (!createSource) {
+            boolean sourceExists = new File(srcPath).exists();
+            if (!sourceExists) {
+                throw new IOException("SourcePath " + srcPath + " doesn't 
exit");
+            }
+        }
+
+        ArrayList<String> options = new ArrayList<>();
+        if (isReadWrite) {
+            options.add("rw");
+        } else {
+            options.add("ro");
+        }
+        options.add("rbind");
+        options.add("rprivate");
+        mounts.add(new OciMount(dstPath, "bind", srcPath, options));
+    }
+
+    @Override
+    public long getMemoryUsage(String user, String workerId, int port) throws 
IOException {
+        // "/sys/fs/cgroup/memory/storm/containerId/"
+        String containerId = getContainerId(workerId, port);
+        String memoryCgroupPath = memoryCgroupRootPath + File.separator  + 
containerId;
+        MemoryCore memoryCore = new MemoryCore(memoryCgroupPath);
+        LOG.debug("ContainerId {} : Got memory getPhysicalUsage {} from {}", 
containerId, memoryCore.getPhysicalUsage(), memoryCgroupPath);
+        return memoryCore.getPhysicalUsage();
+    }
+
+    @Override
+    public void kill(String user, String workerId) throws IOException {
+        LOG.info("Killing {}", workerId);
+        Long pid = getContainerPid(workerId);
+        if (pid != null) {
+            signal(pid, 15, user);
+        } else {
+            LOG.warn("Trying to kill container {} but pidfile is not found", 
workerId);
+        }
+    }
+
+    private void signal(long pid, int signal, String user) throws IOException {
+        List<String> commands = Arrays.asList("signal", String.valueOf(pid), 
String.valueOf(signal));
+        String logPrefix = "kill -" + signal + " " + pid;
+        ClientSupervisorUtils.processLauncherAndWait(conf, user, commands, 
null, logPrefix);
+    }
+
+    @Override
+    public void forceKill(String user, String workerId) throws IOException {
+        LOG.debug("ForceKilling {}", workerId);
+        Long pid = getContainerPid(workerId);
+        if (pid != null) {
+            signal(pid, 9, user);
+        } else {
+            LOG.warn("Trying to forceKill container {} but pidfile is not 
found", workerId);
+        }
+    }
+
+    // return null if not found.
+    private Long getContainerPid(String workerId) {
+        Long pid = workerToContainerPid.get(workerId);
+        if (pid == null) {
+            String containerPidFilePath = containerPidFile(workerId);
+            if (!new File(containerPidFilePath).exists()) {
+                LOG.warn("{} doesn't exist", containerPidFilePath);
+            } else {
+                try {
+                    pid = 
Long.parseLong(CgroupUtils.readFileByLine(containerPidFilePath).get(0));
+                    workerToContainerPid.put(workerId, pid);
+                } catch (IOException e) {
+                    LOG.warn("failed to read {}", containerPidFilePath);
+                }
+            }
+        }
+        return pid;
+    }
+
+    @Override
+    public void releaseResourcesForWorker(String workerId) {
+        super.releaseResourcesForWorker(workerId);
+        workerToContainerPid.remove(workerId);
+    }
+
+    /**
+     * The container terminates if any process inside the container dies.
+     * So we only need to check if the initial process is alive or not.
+     * @param user the user that the processes are running as
+     * @param workerId the id of the worker to kill
+     * @return true if all processes are dead; false otherwise
+     * @throws IOException on I/O exception
+     */
+    @Override
+    public boolean areAllProcessesDead(String user, String workerId) throws 
IOException {
+        boolean areAllDead = isContainerDead(workerId, user);
+        LOG.debug("WorkerId {}: Checking areAllProcessesDead: {}", workerId, 
areAllDead);
+        return areAllDead;
+    }
+
+    @Override
+    public void cleanup(String user, String workerId, int port) throws 
IOException {
+        LOG.debug("clean up worker {}", workerId);
+        try {
+            String containerId = getContainerId(workerId, port);
+            List<String> commands = 
Arrays.asList(CmdType.REAP_OCI_CONTAINER.toString(), containerId, 
String.valueOf(layersToKeep));
+            String logPrefix = "Worker Process " + workerId;
+            int result = ClientSupervisorUtils.processLauncherAndWait(conf, 
user, commands, null, logPrefix);
+            if (result != 0) {
+                LOG.warn("Failed cleaning up RuncWorker {}", workerId);
+            }
+        } catch (FileNotFoundException e) {
+            // This could happen if we had an IOException and failed launching 
the worker.
+            // We need to continue on in order for the worker directory to get 
cleaned up.
+            LOG.error("Failed to find container id for {} ({}), unable to reap 
container", workerId, e.getMessage());
+        }
+        //remove from the watched list
+        LOG.debug("Removing {} from the watched workers list", workerId);
+        workerToUser.remove(workerId);

Review comment:
       Thanks for addressing this.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@storm.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to