This is an automated email from the ASF dual-hosted git repository. oleewere pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.7 by this push: new 83c8063 AMBARI-24183. Log Feeder: read and ship docker container logs.(#1799) 83c8063 is described below commit 83c8063e593f0a67ac247c6d3e03876c4a8adb18 Author: Olivér Szabó <oleew...@gmail.com> AuthorDate: Thu Jul 19 15:40:42 2018 +0200 AMBARI-24183. Log Feeder: read and ship docker container logs.(#1799) --- .../api/model/inputconfig/InputFileDescriptor.java | 2 + .../inputconfig/impl/InputFileDescriptorImpl.java | 20 +++ .../pom.xml | 83 +++++++++++ .../ambari/logfeeder/ContainerMetadata.java} | 42 +++++- .../ambari/logfeeder/ContainerRegistry.java} | 26 +++- .../logfeeder/docker/DockerContainerRegistry.java | 145 ++++++++++++++++++ .../docker/DockerContainerRegistryMonitor.java | 50 +++++++ .../ambari/logfeeder/docker/DockerMetadata.java | 81 ++++++++++ .../docker/command/CommandExecutionHelper.java | 52 +++++++ .../logfeeder/docker/command/CommandResponse.java} | 34 ++++- .../docker/command/ContainerCommand.java} | 25 ++-- .../command/DockerInspectContainerCommand.java | 61 ++++++++ .../docker/command/DockerListContainerCommand.java | 54 +++++++ .../src/main/resources/log4j.properties} | 23 +-- .../ambari/logfeeder/plugin/input/Input.java | 9 ++ .../ambari-logsearch-logfeeder/pom.xml | 5 + .../ambari/logfeeder/common/ConfigHandler.java | 1 + .../logfeeder/common/LogFeederConstants.java | 3 + .../ambari/logfeeder/conf/ApplicationConfig.java | 14 ++ .../ambari/logfeeder/conf/LogFeederProps.java | 18 +++ .../ambari/logfeeder/filter/DockerLogFilter.java} | 20 ++- .../apache/ambari/logfeeder/filter/FilterGrok.java | 14 ++ .../apache/ambari/logfeeder/input/InputFile.java | 166 +++++++++++++++++---- .../ambari/logfeeder/input/InputManagerImpl.java | 17 +++ .../input/monitor/DockerLogFileUpdateMonitor.java | 101 +++++++++++++ .../logsearch/model/common/LSServerInputFile.java | 12 ++ ambari-logsearch/docker/docker-compose.yml | 8 + .../test-config/logfeeder/logfeeder.properties | 3 +- .../input.config-logsearch-docker.json | 31 ++++ ambari-logsearch/pom.xml | 1 + 30 files changed, 1029 insertions(+), 92 deletions(-) diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java index b58db6a..2689f82 100644 --- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java +++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java @@ -27,4 +27,6 @@ public interface InputFileDescriptor extends InputFileBaseDescriptor { Integer getPathUpdateIntervalMin(); Integer getMaxAgeMin(); + + Boolean getDockerEnabled(); } diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java index 99b42fe..2ba53e6 100644 --- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java +++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputFileDescriptorImpl.java @@ -70,6 +70,17 @@ public class InputFileDescriptorImpl extends InputFileBaseDescriptorImpl impleme @SerializedName("max_age_min") private Integer maxAgeMin; + @ShipperConfigElementDescription( + path = "/input/[]/docker", + type = "boolean", + description = "Input comes from a docker container.", + examples = {"true", "false"}, + defaultValue = "false" + ) + @Expose + @SerializedName("docker") + private Boolean dockerEnabled; + @Override public Integer getDetachIntervalMin() { return this.detachIntervalMin; @@ -90,6 +101,11 @@ public class InputFileDescriptorImpl extends InputFileBaseDescriptorImpl impleme return this.maxAgeMin; } + @Override + public Boolean getDockerEnabled() { + return dockerEnabled; + } + public void setDetachIntervalMin(Integer detachIntervalMin) { this.detachIntervalMin = detachIntervalMin; } @@ -105,4 +121,8 @@ public class InputFileDescriptorImpl extends InputFileBaseDescriptorImpl impleme public void setMaxAgeMin(Integer maxAgeMin) { this.maxAgeMin = maxAgeMin; } + + public void setDockerEnabled(Boolean dockerEnabled) { + this.dockerEnabled = dockerEnabled; + } } diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/pom.xml b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/pom.xml new file mode 100644 index 0000000..66333f5 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/pom.xml @@ -0,0 +1,83 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>ambari-logsearch</artifactId> + <groupId>org.apache.ambari</groupId> + <version>2.0.0.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + <packaging>jar</packaging> + <artifactId>ambari-logsearch-logfeeder-container-registry</artifactId> + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.25</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>1.7.25</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>2.9.4</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + <version>2.9.4</version> + </dependency> + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + <version>2.6</version> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.3</version> + <configuration> + <source>${jdk.version}</source> + <target>${jdk.version}</target> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <excludes> + <exclude>**/log4j.properties</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/ContainerMetadata.java similarity index 54% copy from ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java copy to ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/ContainerMetadata.java index b58db6a..df3a80a 100644 --- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/ContainerMetadata.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -16,15 +16,41 @@ * specific language governing permissions and limitations * under the License. */ +package org.apache.ambari.logfeeder; -package org.apache.ambari.logsearch.config.api.model.inputconfig; +/** + * Holds container related metadata + **/ +public interface ContainerMetadata { -public interface InputFileDescriptor extends InputFileBaseDescriptor { - Integer getDetachIntervalMin(); + /** + * Id of the container, used for getting the right log path + * @return container id + */ + String getId(); - Integer getDetachTimeMin(); + /** + * Name of the container + * @return container name + */ + String getName(); - Integer getPathUpdateIntervalMin(); + /** + * Hostname of the container, can be container host itself or the actual hostname + * @return container host name + */ + String getHostName(); + + /** + * Log label + * @return log type label + */ + String getLogTypeLabel(); + + /** + * Log path of the container (should be json file) + * @return log path + */ + String getLogPath(); - Integer getMaxAgeMin(); } diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/ContainerRegistry.java similarity index 58% copy from ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java copy to ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/ContainerRegistry.java index b58db6a..94f6a82 100644 --- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/ContainerRegistry.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -16,15 +16,25 @@ * specific language governing permissions and limitations * under the License. */ +package org.apache.ambari.logfeeder; -package org.apache.ambari.logsearch.config.api.model.inputconfig; +import java.util.Map; -public interface InputFileDescriptor extends InputFileBaseDescriptor { - Integer getDetachIntervalMin(); +/** + * Responsible of register or drop new / existing containers. + * @param <METADATA_TYPE> type of metadata - could be docker or other container implementation + */ +public interface ContainerRegistry<METADATA_TYPE extends ContainerMetadata> { - Integer getDetachTimeMin(); + /** + * Register process of running containers + */ + void register(); - Integer getPathUpdateIntervalMin(); + /** + * Holds container metadata per log component type and container id. + * @return container metadata + */ + Map<String, Map<String, METADATA_TYPE>> getContainerMetadataMap(); - Integer getMaxAgeMin(); } diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/DockerContainerRegistry.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/DockerContainerRegistry.java new file mode 100644 index 0000000..c3e816e --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/DockerContainerRegistry.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.docker; + +import org.apache.ambari.logfeeder.ContainerRegistry; +import org.apache.ambari.logfeeder.docker.command.DockerInspectContainerCommand; +import org.apache.ambari.logfeeder.docker.command.DockerListContainerCommand; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * Register docker metadata from docker containers on the host (with listing containers and inspecting them) + */ +public final class DockerContainerRegistry implements ContainerRegistry<DockerMetadata> { + + private static final String LOGFEEDER_CONTAINER_REGISTRY_DOCKER_INTERVAL = "logfeeder.container.registry.docker.interval"; + private static final Logger logger = LoggerFactory.getLogger(DockerContainerRegistry.class); + + private static DockerContainerRegistry INSTANCE = null; + private final Properties configs; + private Map<String, Map<String, DockerMetadata>> dockerMetadataMap = new ConcurrentHashMap<>(); + private int waitIntervalMin = 5; + + private DockerContainerRegistry(Properties configs) { + this.configs = configs; + init(configs); + } + + @Override + public synchronized void register() { + Map<String, Map<String, DockerMetadata>> actualDockerMetadataMap = renewMetadata(); + if (!actualDockerMetadataMap.isEmpty()) { + dockerMetadataMap.putAll(actualDockerMetadataMap); + dockerMetadataMap = dockerMetadataMap + .entrySet() + .stream() + .filter(e -> actualDockerMetadataMap.keySet().contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + for (Map.Entry<String, Map<String, DockerMetadata>> entry : dockerMetadataMap.entrySet()) { + for (Map.Entry<String, DockerMetadata> metadataEntry : entry.getValue().entrySet()) { + logger.debug("Found container metadata: {}", entry.getValue().toString()); + } + } + } + } + + private Map<String, Map<String, DockerMetadata>> renewMetadata() { + final Map<String, Map<String, DockerMetadata>> actualDockerMetadataMap = new HashMap<>(); + final List<String> containerIds = new DockerListContainerCommand().execute(null); + final Map<String, String> params = new HashMap<>(); + + params.put("containerIds", StringUtils.join(containerIds, ",")); + List<Map<String, Object>> containerDataList = new DockerInspectContainerCommand().execute(params); + + for (Map<String, Object> containerDataMap : containerDataList) { + String id = containerDataMap.get("Id").toString(); + String name = containerDataMap.get("Name").toString(); + String logPath = containerDataMap.get("LogPath").toString(); + Map<String, Object> dockerConfigMap = (HashMap<String, Object>) containerDataMap.get("Config"); + String hostname = dockerConfigMap.get("Hostname").toString(); + Map<String, String> labels = (Map<String, String>) dockerConfigMap.get("Labels"); + Map<String, Object> stateMap = (HashMap<String, Object>) containerDataMap.get("State"); + String componentType = labels.get("logfeeder.log.type"); + boolean running = (Boolean) stateMap.get("Running"); + long timestamp = running ? convertDateStrToLong((String)stateMap.get("StartedAt")) : convertDateStrToLong((String)stateMap.get("FinishedAt")); + + if (componentType != null) { + if (actualDockerMetadataMap.containsKey(componentType)) { + Map<String, DockerMetadata> componentMetadataMap = actualDockerMetadataMap.get(componentType); + componentMetadataMap.put(id, new DockerMetadata(id, name, hostname, componentType, logPath, running, timestamp)); + actualDockerMetadataMap.put(componentType, componentMetadataMap); + } else { + Map<String, DockerMetadata> componentMetadataMap = new HashMap<>(); + componentMetadataMap.put(id, new DockerMetadata(id, name, hostname, componentType, logPath, running, timestamp)); + actualDockerMetadataMap.put(componentType, componentMetadataMap); + } + } else { + logger.debug("Ignoring docker metadata from registry as container (id: {}, name: {}) as it has no 'logfeeder.log.type' label", id, name); + } + } + + return actualDockerMetadataMap; + } + + @Override + public synchronized Map<String, Map<String, DockerMetadata>> getContainerMetadataMap() { + return dockerMetadataMap; + } + + public void init(Properties configs) { + // init docker related data + String waitStr = configs.getProperty(LOGFEEDER_CONTAINER_REGISTRY_DOCKER_INTERVAL, "5"); + setWaitIntervalMin(Integer.parseInt(waitStr)); + // TODO: add docker authentication settings through this + } + + public static synchronized DockerContainerRegistry getInstance(Properties dockerConfig) { + if (INSTANCE == null) { + return new DockerContainerRegistry(dockerConfig); + } else { + return INSTANCE; + } + } + + public int getWaitIntervalMin() { + return waitIntervalMin; + } + + public void setWaitIntervalMin(int waitIntervalMin) { + this.waitIntervalMin = waitIntervalMin; + } + + private long convertDateStrToLong(String timestampStr) { + LocalDateTime localDateTime = LocalDateTime.parse(timestampStr, DateTimeFormatter.ISO_DATE_TIME); + return localDateTime.toInstant(ZoneOffset.ofTotalSeconds(0)).toEpochMilli(); + } +} diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/DockerContainerRegistryMonitor.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/DockerContainerRegistryMonitor.java new file mode 100644 index 0000000..30c328d --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/DockerContainerRegistryMonitor.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.docker; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Periodically re-register docker container metadata for {@link org.apache.ambari.logfeeder.docker.DockerContainerRegistry} + * based on a time interval in seconds (property: logfeeder.container.registry.docker.interval, default: 5) + */ +public class DockerContainerRegistryMonitor implements Runnable { + + private static final Logger logger = LoggerFactory.getLogger(DockerContainerRegistryMonitor.class); + + private final DockerContainerRegistry registry; + + public DockerContainerRegistryMonitor(DockerContainerRegistry registry) { + this.registry = registry; + } + + @Override + public void run() { + while (!Thread.currentThread().isInterrupted()) { + try { + logger.debug("Gather docker containers metadata ..."); + registry.register(); + Thread.sleep(1000 * registry.getWaitIntervalMin()); + } catch (Exception e) { + logger.error("Error during gather docker containers metadata.", e); + } + } + } +} diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/DockerMetadata.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/DockerMetadata.java new file mode 100644 index 0000000..65842b4 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/DockerMetadata.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.docker; + +import org.apache.ambari.logfeeder.ContainerMetadata; + +public class DockerMetadata implements ContainerMetadata { + + private final String id; + private final String name; + private final String logTypeLabel; + private final String logPath; + private final String hostName; + private final boolean running; + private final long timestamp; + + public DockerMetadata(String id, String name, String hostName, String logTypeLabel, String logPath, boolean running, long timestamp) { + this.id = id; + this.name = name; + this.hostName = hostName; + this.logTypeLabel = logTypeLabel; + this.logPath = logPath; + this.running = running; + this.timestamp = timestamp; + } + + public String getId() { + return id; + } + + public String getName() { + return name; + } + + public String getHostName() { + return hostName; + } + + public String getLogTypeLabel() { + return logTypeLabel; + } + + public String getLogPath() { + return logPath; + } + + public boolean isRunning() { + return running; + } + + public long getTimestamp() { + return timestamp; + } + + @Override + public String toString() { + return "DockerMetadata{" + + "id='" + id + '\'' + + ", name='" + name + '\'' + + ", logTypeLabel='" + logTypeLabel + '\'' + + ", logPath='" + logPath + '\'' + + ", hostName='" + hostName + '\'' + + '}'; + } +} diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/CommandExecutionHelper.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/CommandExecutionHelper.java new file mode 100644 index 0000000..aa65c60 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/CommandExecutionHelper.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.docker.command; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class CommandExecutionHelper { + + public static CommandResponse executeCommand(List<String> commands, Map<String, String> envMap) throws Exception { + ProcessBuilder processBuilder = new ProcessBuilder(commands); + Map<String, String> env = processBuilder.environment(); + if (envMap != null) { + env.putAll(envMap); + } + Process shell = processBuilder.start(); + + BufferedReader stdInput = new BufferedReader(new InputStreamReader(shell.getInputStream())); + BufferedReader stdError = new BufferedReader(new InputStreamReader(shell.getErrorStream())); + List<String> stdOutLines = new ArrayList<>(); + StringBuilder errOut = new StringBuilder(); + String s = null; + while ((s = stdInput.readLine()) != null) { + stdOutLines.add(s); + } + while ((s = stdError.readLine()) != null) { + errOut.append(s); + } + int exitCode = shell.waitFor(); + + return new CommandResponse(exitCode, stdOutLines, errOut.toString()); + } +} diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/CommandResponse.java similarity index 55% copy from ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java copy to ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/CommandResponse.java index b58db6a..7ead791 100644 --- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/CommandResponse.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -16,15 +16,33 @@ * specific language governing permissions and limitations * under the License. */ +package org.apache.ambari.logfeeder.docker.command; -package org.apache.ambari.logsearch.config.api.model.inputconfig; +import java.util.List; -public interface InputFileDescriptor extends InputFileBaseDescriptor { - Integer getDetachIntervalMin(); +/** + * Represent a bash command response (stdout as string list, stderr in string and an exit code) + */ +public class CommandResponse { + private final int exitCode; + private final List<String> stdOut; + private final String stdErr; + + CommandResponse(int exitCode, List<String> stdOut, String stdErr) { + this.exitCode = exitCode; + this.stdOut = stdOut; + this.stdErr = stdErr; + } - Integer getDetachTimeMin(); + public int getExitCode() { + return exitCode; + } - Integer getPathUpdateIntervalMin(); + public List<String> getStdOut() { + return stdOut; + } - Integer getMaxAgeMin(); + public String getStdErr() { + return stdErr; + } } diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/ContainerCommand.java similarity index 63% copy from ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java copy to ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/ContainerCommand.java index b58db6a..db3de01 100644 --- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/ContainerCommand.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -16,15 +16,20 @@ * specific language governing permissions and limitations * under the License. */ +package org.apache.ambari.logfeeder.docker.command; -package org.apache.ambari.logsearch.config.api.model.inputconfig; +import java.util.Map; -public interface InputFileDescriptor extends InputFileBaseDescriptor { - Integer getDetachIntervalMin(); - - Integer getDetachTimeMin(); - - Integer getPathUpdateIntervalMin(); +/** + * Responsible of execute container commands. (like listing or inspecting containers) + * @param <RESPONSE_TYPE> + */ +public interface ContainerCommand<RESPONSE_TYPE> { - Integer getMaxAgeMin(); + /** + * Execute a container command + * @param params extra parameters for the command + * @return return type of the execution - can be anything + */ + RESPONSE_TYPE execute(Map<String, String> params); } diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/DockerInspectContainerCommand.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/DockerInspectContainerCommand.java new file mode 100644 index 0000000..d4fc182 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/DockerInspectContainerCommand.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.docker.command; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * Run 'docker inspect' on container ids - and read response and convert it from json response to a map object + */ +public class DockerInspectContainerCommand implements ContainerCommand<List<Map<String, Object>>> { + + private static final Logger logger = LoggerFactory.getLogger(DockerInspectContainerCommand.class); + + @Override + public List<Map<String, Object>> execute(Map<String, String> params) { + List<String> containerIds = Arrays.asList(params.get("containerIds").split(",")); + CommandResponse commandResponse = null; + List<Map<String, Object>> listResponse = new ArrayList<>(); + List<String> commandList = new ArrayList<>(); + commandList.add("/usr/local/bin/docker"); + commandList.add("inspect"); + commandList.addAll(containerIds); + try { + commandResponse = CommandExecutionHelper.executeCommand(commandList, null); + if (commandResponse.getExitCode() != 0) { + logger.error("Error during inspect containers request: {} (exit code: {})", commandResponse.getStdErr(), commandResponse.getExitCode()); + } else { + String jsonResponse = StringUtils.join(commandResponse.getStdOut(), ""); + ObjectMapper mapper = new ObjectMapper(); + listResponse = mapper.readValue(jsonResponse, List.class); + } + } catch (Exception e) { + logger.error("Error during inspect containers request", e); + } + return listResponse; + } +} diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/DockerListContainerCommand.java b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/DockerListContainerCommand.java new file mode 100644 index 0000000..a0596ca --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/java/org/apache/ambari/logfeeder/docker/command/DockerListContainerCommand.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.docker.command; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Run 'docker ps -a -q' (+ logfeeder type filter) and save the response in a string list (container ids) + */ +public class DockerListContainerCommand implements ContainerCommand<List<String>> { + + private static final Logger logger = LoggerFactory.getLogger(DockerListContainerCommand.class); + + @Override + public List<String> execute(Map<String, String> params) { + CommandResponse commandResponse = null; + List<String> commandList = new ArrayList<>(); + commandList.add("/usr/local/bin/docker"); + commandList.add("ps"); + commandList.add("-a"); + commandList.add("-q"); + // TODO: add --filter="label=logfeeder.log.type" + try { + commandResponse = CommandExecutionHelper.executeCommand(commandList, null); + if (commandResponse.getExitCode() != 0) { + logger.error("Error during inspect containers request: {} (exit code: {})", commandResponse.getStdErr(), commandResponse.getExitCode()); + } + } catch (Exception e) { + logger.error("Error during inspect containers request", e); + } + return commandResponse != null ? commandResponse.getStdOut() : null; + } +} diff --git a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/resources/log4j.properties similarity index 52% copy from ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties copy to ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/resources/log4j.properties index 16cbb0f..6380ac7 100644 --- a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties +++ b/ambari-logsearch/ambari-logsearch-logfeeder-container-registry/src/main/resources/log4j.properties @@ -12,21 +12,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -cluster.name=CL1 -logfeeder.checkpoint.folder=/root/checkpoints -logfeeder.metrics.collector.hosts= -logfeeder.config.dir=/root/config/logfeeder/shipper-conf/ -logfeeder.config.files=shipper-conf/global.config.json,\ - shipper-conf/output.config.json -logfeeder.log.filter.enable=true -logfeeder.solr.config.interval=5 -logfeeder.solr.core.config.name=history -logfeeder.solr.zk_connect_string=localhost:9983 -logfeeder.cache.enabled=true -logfeeder.cache.size=100 -logfeeder.cache.key.field=log_message -logfeeder.cache.dedup.interval=1000 -logfeeder.cache.last.dedup.enabled=true -logsearch.config.zk_connect_string=localhost:9983 -logfeeder.include.default.level=FATAL,ERROR,WARN,INFO,DEBUG,TRACE,UNKNOWN \ No newline at end of file +log4j.rootLogger=DEBUG, stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd'T'HH:mm:ss.SSS} %-5p [%t] - %m%n \ No newline at end of file diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java index a586510..ed0edcd 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java @@ -51,6 +51,7 @@ public abstract class Input<PROP_TYPE extends LogFeederProperties, INPUT_MARKER private Filter<PROP_TYPE> firstFilter; private boolean isClosed; private String type; + private String logType; private boolean useEventMD5 = false; private boolean genEventMD5 = true; private Thread thread; @@ -238,6 +239,14 @@ public abstract class Input<PROP_TYPE extends LogFeederProperties, INPUT_MARKER this.type = type; } + public String getLogType() { + return logType; + } + + public void setLogType(String logType) { + this.logType = logType; + } + public boolean isUseEventMD5() { return useEventMD5; } diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml index 24bff8d..2544c4c 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml +++ b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml @@ -54,6 +54,11 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>org.apache.ambari</groupId> + <artifactId>ambari-logsearch-logfeeder-container-registry</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> <version>1.3.1</version> diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java index 2a23cd7..80b7104 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java @@ -297,6 +297,7 @@ public class ConfigHandler implements InputConfigMonitor { continue; } input.setType(source); + input.setLogType(inputDescriptor.getType()); input.loadConfig(inputDescriptor); if (input.isEnabled()) { diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java index 80dc163..a988840 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java @@ -89,4 +89,7 @@ public class LogFeederConstants { public static final String CHECKPOINT_EXTENSION_PROPERTY = "logfeeder.checkpoint.extension"; public static final String DEFAULT_CHECKPOINT_EXTENSION = ".cp"; + public static final String DOCKER_CONTAINER_REGISTRY_ENABLED_PROPERTY = "logfeeder.docker.registry.enabled"; + public static final boolean DOCKER_CONTAINER_REGISTRY_ENABLED_DEFAULT = false; + } diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java index cfb199c..ee8cdcb 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java @@ -19,7 +19,10 @@ package org.apache.ambari.logfeeder.conf; import com.google.common.collect.Maps; +import org.apache.ambari.logfeeder.ContainerRegistry; +import org.apache.ambari.logfeeder.docker.DockerContainerRegistry; import org.apache.ambari.logfeeder.common.LogFeederConstants; +import org.apache.ambari.logfeeder.docker.DockerContainerRegistryMonitor; import org.apache.ambari.logfeeder.input.InputConfigUploader; import org.apache.ambari.logfeeder.input.InputManagerImpl; import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler; @@ -39,6 +42,7 @@ import org.springframework.context.annotation.PropertySource; import org.springframework.context.support.PropertySourcesPlaceholderConfigurer; import javax.inject.Inject; +import java.util.Properties; @Configuration @PropertySource(value = { @@ -99,6 +103,7 @@ public class ApplicationConfig { @Bean + @DependsOn("containerRegistry") public InputManager inputManager() { return new InputManagerImpl(); } @@ -107,4 +112,13 @@ public class ApplicationConfig { public OutputManager outputManager() { return new OutputManagerImpl(); } + + @Bean + public DockerContainerRegistry containerRegistry() { + if (logFeederProps.isDockerContainerRegistryEnabled()) { + return DockerContainerRegistry.getInstance(logFeederProps.getProperties()); + } else { + return null; + } + } } diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java index a75b2d6..9a29f86 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java @@ -130,6 +130,16 @@ public class LogFeederProps implements LogFeederProperties { @Value("${" + LogFeederConstants.CHECKPOINT_FOLDER_PROPERTY + ":/usr/lib/ambari-logsearch-logfeeder/conf/checkpoints}") public String checkpointFolder; + @LogSearchPropertyDescription( + name = LogFeederConstants.DOCKER_CONTAINER_REGISTRY_ENABLED_PROPERTY, + description = "", + examples = {"true"}, + defaultValue = LogFeederConstants.DOCKER_CONTAINER_REGISTRY_ENABLED_DEFAULT + "", + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${" + LogFeederConstants.DOCKER_CONTAINER_REGISTRY_ENABLED_PROPERTY + ":false}") + public boolean dockerContainerRegistryEnabled; + @Inject private LogEntryCacheConfig logEntryCacheConfig; @@ -227,6 +237,14 @@ public class LogFeederProps implements LogFeederProperties { this.solrImplicitRouting = solrImplicitRouting; } + public boolean isDockerContainerRegistryEnabled() { + return dockerContainerRegistryEnabled; + } + + public void setDockerContainerRegistryEnabled(boolean dockerContainerRegistryEnabled) { + this.dockerContainerRegistryEnabled = dockerContainerRegistryEnabled; + } + @PostConstruct public void init() { properties = new Properties(); diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/DockerLogFilter.java similarity index 68% copy from ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java copy to ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/DockerLogFilter.java index b58db6a..ab13775 100644 --- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputFileDescriptor.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/DockerLogFilter.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -16,15 +16,19 @@ * specific language governing permissions and limitations * under the License. */ +package org.apache.ambari.logfeeder.filter; -package org.apache.ambari.logsearch.config.api.model.inputconfig; +import org.apache.ambari.logfeeder.util.LogFeederUtil; -public interface InputFileDescriptor extends InputFileBaseDescriptor { - Integer getDetachIntervalMin(); +import java.util.Map; - Integer getDetachTimeMin(); +public class DockerLogFilter { - Integer getPathUpdateIntervalMin(); + private DockerLogFilter() { + } - Integer getMaxAgeMin(); + public static String getLogFromDockerJson(String jsonInput) { + Map<String, Object> jsonMap = LogFeederUtil.toJSONObject(jsonInput); + return jsonMap.get("log").toString(); + } } diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java index 2074f93..5ed61cc 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java @@ -23,11 +23,14 @@ import com.google.gson.reflect.TypeToken; import oi.thekraken.grok.api.Grok; import oi.thekraken.grok.api.exception.GrokException; import org.apache.ambari.logfeeder.conf.LogFeederProps; +import org.apache.ambari.logfeeder.input.InputFile; import org.apache.ambari.logfeeder.plugin.common.MetricData; import org.apache.ambari.logfeeder.plugin.filter.Filter; +import org.apache.ambari.logfeeder.plugin.input.Input; import org.apache.ambari.logfeeder.plugin.input.InputMarker; import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterGrokDescriptor; +import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileDescriptor; import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Level; @@ -73,6 +76,8 @@ public class FilterGrok extends Filter<LogFeederProps> { private boolean skipOnError = false; + private boolean dockerEnabled = false; + @Override public void init(LogFeederProps logFeederProps) throws Exception { super.init(logFeederProps); @@ -83,6 +88,12 @@ public class FilterGrok extends Filter<LogFeederProps> { sourceField = getFilterDescriptor().getSourceField(); removeSourceField = BooleanUtils.toBooleanDefaultIfNull(getFilterDescriptor().isRemoveSourceField(), removeSourceField); skipOnError = ((FilterGrokDescriptor) getFilterDescriptor()).isSkipOnError(); + if (logFeederProps.isDockerContainerRegistryEnabled()) { + Input input = getInput(); + if (input != null && input instanceof InputFile) { + dockerEnabled = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor) input.getInputDescriptor()).getDockerEnabled(), false); + } + } LOG.info("init() done. grokPattern=" + messagePattern + ", multilinePattern=" + multilinePattern + ", " + getShortDescription()); @@ -178,6 +189,9 @@ public class FilterGrok extends Filter<LogFeederProps> { @Override public void apply(String inputStr, InputMarker inputMarker) throws Exception { + if (dockerEnabled) { + inputStr = DockerLogFilter.getLogFromDockerJson(inputStr); + } if (grokMessage == null) { return; } diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java index 726a237..441ce3e 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java @@ -20,6 +20,9 @@ package org.apache.ambari.logfeeder.input; import org.apache.ambari.logfeeder.conf.LogEntryCacheConfig; import org.apache.ambari.logfeeder.conf.LogFeederProps; +import org.apache.ambari.logfeeder.docker.DockerContainerRegistry; +import org.apache.ambari.logfeeder.docker.DockerMetadata; +import org.apache.ambari.logfeeder.input.monitor.DockerLogFileUpdateMonitor; import org.apache.ambari.logfeeder.input.monitor.LogFileDetachMonitor; import org.apache.ambari.logfeeder.input.monitor.LogFilePathUpdateMonitor; import org.apache.ambari.logfeeder.input.reader.LogsearchReaderFactory; @@ -29,7 +32,6 @@ import org.apache.ambari.logfeeder.input.file.ResumeLineNumberHelper; import org.apache.ambari.logfeeder.plugin.filter.Filter; import org.apache.ambari.logfeeder.plugin.input.Input; import org.apache.ambari.logfeeder.util.FileUtil; -import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileBaseDescriptor; import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileDescriptor; import org.apache.commons.lang.BooleanUtils; @@ -81,28 +83,43 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker> { private Thread thread; private Thread logFileDetacherThread; private Thread logFilePathUpdaterThread; + private Thread dockerLogFileUpdateMonitorThread; private ThreadGroup threadGroup; private boolean multiFolder = false; + private boolean dockerLog = false; + private boolean dockerLogParent = true; + private DockerContainerRegistry dockerContainerRegistry; private Map<String, List<File>> folderMap; private Map<String, InputFile> inputChildMap = new HashMap<>(); @Override public boolean isReady() { if (!isReady) { - // Let's try to check whether the file is available - logFiles = getActualInputLogFiles(); - Map<String, List<File>> foldersMap = FileUtil.getFoldersForFiles(logFiles); - setFolderMap(foldersMap); - if (!ArrayUtils.isEmpty(logFiles) && logFiles[0].isFile()) { - if (tail && logFiles.length > 1) { - LOG.warn("Found multiple files (" + logFiles.length + ") for the file filter " + filePath + - ". Will follow only the first one. Using " + logFiles[0].getAbsolutePath()); + if (dockerLog) { + if (dockerContainerRegistry != null) { + Map<String, Map<String, DockerMetadata>> metadataMap = dockerContainerRegistry.getContainerMetadataMap(); + String logType = getLogType(); + if (metadataMap.containsKey(logType)) { + isReady = true; + } + } else { + LOG.warn("Docker registry is not set, probably docker registry usage is not enabled."); } - LOG.info("File filter " + filePath + " expanded to " + logFiles[0].getAbsolutePath()); - isReady = true; } else { - LOG.debug(logPath + " file doesn't exist. Ignoring for now"); + logFiles = getActualInputLogFiles(); + Map<String, List<File>> foldersMap = FileUtil.getFoldersForFiles(logFiles); + setFolderMap(foldersMap); + if (!ArrayUtils.isEmpty(logFiles) && logFiles[0].isFile()) { + if (tail && logFiles.length > 1) { + LOG.warn("Found multiple files (" + logFiles.length + ") for the file filter " + filePath + + ". Will follow only the first one. Using " + logFiles[0].getAbsolutePath()); + } + LOG.info("File filter " + filePath + " expanded to " + logFiles[0].getAbsolutePath()); + isReady = true; + } else { + LOG.debug(logPath + " file doesn't exist. Ignoring for now"); + } } } return isReady; @@ -150,7 +167,25 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker> { @Override public boolean monitor() { if (isReady()) { - if (multiFolder) { + if (dockerLog && dockerLogParent) { + Map<String, Map<String, DockerMetadata>> metadataMap = dockerContainerRegistry.getContainerMetadataMap(); + String logType = getLogType(); + threadGroup = new ThreadGroup("docker-parent-" + logType); + if (metadataMap.containsKey(logType)) { + Map<String, DockerMetadata> dockerMetadataMap = metadataMap.get(logType); + for (Map.Entry<String, DockerMetadata> dockerMetadataEntry : dockerMetadataMap.entrySet()) { + try { + startNewChildDockerInputFileThread(dockerMetadataEntry.getValue()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + dockerLogFileUpdateMonitorThread = new Thread(new DockerLogFileUpdateMonitor((InputFile) this, pathUpdateIntervalMin, detachTimeMin), "docker_logfiles_updater=" + logType); + dockerLogFileUpdateMonitorThread.setDaemon(true); + dockerLogFileUpdateMonitorThread.start(); + } + } + else if (multiFolder) { try { threadGroup = new ThreadGroup(getNameForThread()); if (getFolderMap() != null) { @@ -181,6 +216,7 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker> { @Override public InputFileMarker getInputMarker() { + // TODO: use this return null; } @@ -190,10 +226,6 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker> { LOG.info("init() called"); checkPointExtension = logFeederProps.getCheckPointExtension(); - - // Let's close the file and set it to true after we start monitoring it - setClosed(true); - logPath = getInputDescriptor().getPath(); checkPointIntervalMS = (int) ObjectUtils.defaultIfNull(((InputFileBaseDescriptor)getInputDescriptor()).getCheckpointIntervalMs(), DEFAULT_CHECKPOINT_INTERVAL_MS); detachIntervalMin = (int) ObjectUtils.defaultIfNull(((InputFileDescriptor)getInputDescriptor()).getDetachIntervalMin(), DEFAULT_DETACH_INTERVAL_MIN * 60); detachTimeMin = (int) ObjectUtils.defaultIfNull(((InputFileDescriptor)getInputDescriptor()).getDetachTimeMin(), DEFAULT_DETACH_TIME_MIN * 60); @@ -201,23 +233,37 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker> { maxAgeMin = (int) ObjectUtils.defaultIfNull(((InputFileDescriptor)getInputDescriptor()).getMaxAgeMin(), 0); boolean initDefaultFields = BooleanUtils.toBooleanDefaultIfNull(getInputDescriptor().isInitDefaultFields(), false); setInitDefaultFields(initDefaultFields); - if (StringUtils.isEmpty(logPath)) { - LOG.error("path is empty for file input. " + getShortDescription()); - return; - } - setFilePath(logPath); - // Check there can have pattern in folder - if (getFilePath() != null && getFilePath().contains("/")) { - int lastIndexOfSlash = getFilePath().lastIndexOf("/"); - String folderBeforeLogName = getFilePath().substring(0, lastIndexOfSlash); - if (folderBeforeLogName.contains("*")) { - LOG.info("Found regex in folder path ('" + getFilePath() + "'), will check against multiple folders."); - setMultiFolder(true); + // Let's close the file and set it to true after we start monitoring it + setClosed(true); + dockerLog = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor)getInputDescriptor()).getDockerEnabled(), false); + if (dockerLog) { + if (logFeederProps.isDockerContainerRegistryEnabled()) { + boolean isFileReady = isReady(); + LOG.info("Container type to monitor " + getType() + ", tail=" + tail + ", isReady=" + isFileReady); + } else { + LOG.warn("Using docker input, but docker registry usage is not enabled."); + } + } else { + logPath = getInputDescriptor().getPath(); + if (StringUtils.isEmpty(logPath)) { + LOG.error("path is empty for file input. " + getShortDescription()); + return; } + + setFilePath(logPath); + // Check there can have pattern in folder + if (getFilePath() != null && getFilePath().contains("/")) { + int lastIndexOfSlash = getFilePath().lastIndexOf("/"); + String folderBeforeLogName = getFilePath().substring(0, lastIndexOfSlash); + if (folderBeforeLogName.contains("*")) { + LOG.info("Found regex in folder path ('" + getFilePath() + "'), will check against multiple folders."); + setMultiFolder(true); + } + } + boolean isFileReady = isReady(); + LOG.info("File to monitor " + logPath + ", tail=" + tail + ", isReady=" + isFileReady); } - boolean isFileReady = isReady(); - LOG.info("File to monitor " + logPath + ", tail=" + tail + ", isReady=" + isFileReady); LogEntryCacheConfig cacheConfig = logFeederProps.getLogEntryCacheConfig(); initCache( @@ -295,6 +341,37 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker> { } } + public void startNewChildDockerInputFileThread(DockerMetadata dockerMetadata) throws CloneNotSupportedException { + LOG.info("Start docker child input thread - " + dockerMetadata.getLogPath()); + InputFile clonedObject = (InputFile) this.clone(); + clonedObject.setDockerLogParent(false); + clonedObject.logPath = dockerMetadata.getLogPath(); + clonedObject.setFilePath(logPath); + clonedObject.logFiles = new File[]{new File(dockerMetadata.getLogPath())}; + clonedObject.setInputChildMap(new HashMap<>()); + clonedObject.setDockerLogFileUpdateMonitorThread(null); + copyFilters(clonedObject, getFirstFilter()); + Thread thread = new Thread(threadGroup, clonedObject, "file=" + dockerMetadata.getLogPath()); + clonedObject.setThread(thread); + inputChildMap.put(dockerMetadata.getLogPath(), clonedObject); + thread.start(); + } + + public void stopChildDockerInputFileThread(String logPathKey) { + LOG.info("Stop child input thread - " + logPathKey); + String filePath = new File(logPathKey).getName(); + if (inputChildMap.containsKey(logPathKey)) { + InputFile inputFile = inputChildMap.get(logPathKey); + inputFile.setClosed(true); + if (inputFile.getThread() != null && inputFile.getThread().isAlive()) { + inputFile.getThread().interrupt(); + } + inputChildMap.remove(logPathKey); + } else { + LOG.warn(logPathKey + " not found as an input child."); + } + } + public void startNewChildInputFileThread(Map.Entry<String, List<File>> folderFileEntry) throws CloneNotSupportedException { LOG.info("Start child input thread - " + folderFileEntry.getKey()); InputFile clonedObject = (InputFile) this.clone(); @@ -506,8 +583,35 @@ public class InputFile extends Input<LogFeederProps, InputFileMarker> { this.logFilePathUpdaterThread = logFilePathUpdaterThread; } + public Thread getDockerLogFileUpdateMonitorThread() { + return dockerLogFileUpdateMonitorThread; + } + + public void setDockerLogFileUpdateMonitorThread(Thread dockerLogFileUpdateMonitorThread) { + this.dockerLogFileUpdateMonitorThread = dockerLogFileUpdateMonitorThread; + } + public Integer getMaxAgeMin() { return maxAgeMin; } + public void setDockerContainerRegistry(DockerContainerRegistry dockerContainerRegistry) { + this.dockerContainerRegistry = dockerContainerRegistry; + } + + public DockerContainerRegistry getDockerContainerRegistry() { + return dockerContainerRegistry; + } + + public boolean isDockerLog() { + return dockerLog; + } + + public boolean isDockerLogParent() { + return dockerLogParent; + } + + public void setDockerLogParent(boolean dockerLogParent) { + this.dockerLogParent = dockerLogParent; + } } diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java index 40475c6..ea97968 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java @@ -20,6 +20,8 @@ package org.apache.ambari.logfeeder.input; import com.google.common.annotations.VisibleForTesting; import org.apache.ambari.logfeeder.conf.LogFeederProps; +import org.apache.ambari.logfeeder.docker.DockerContainerRegistry; +import org.apache.ambari.logfeeder.docker.DockerContainerRegistryMonitor; import org.apache.ambari.logfeeder.input.monitor.CheckpointCleanupMonitor; import org.apache.ambari.logfeeder.plugin.common.MetricData; import org.apache.ambari.logfeeder.plugin.input.Input; @@ -65,6 +67,9 @@ public class InputManagerImpl extends InputManager { private Thread inputIsReadyMonitor; @Inject + private DockerContainerRegistry dockerContainerRegistry; + + @Inject private LogFeederProps logFeederProps; public List<Input> getInputList(String serviceName) { @@ -127,6 +132,7 @@ public class InputManagerImpl extends InputManager { public void init() throws Exception { initCheckPointSettings(); startMonitorThread(); + startDockerMetadataThread(); } private void initCheckPointSettings() { @@ -162,6 +168,13 @@ public class InputManagerImpl extends InputManager { } } + private void startDockerMetadataThread() { + if (logFeederProps.isDockerContainerRegistryEnabled()) { + Thread obtaiinDockerMetadataThread = new Thread(new DockerContainerRegistryMonitor(dockerContainerRegistry), "obtain_docker_metadata"); + obtaiinDockerMetadataThread.start(); + } + } + private void startMonitorThread() { inputIsReadyMonitor = new Thread("InputIsReadyMonitor") { @Override @@ -199,6 +212,10 @@ public class InputManagerImpl extends InputManager { public void startInputs(String serviceName) { for (Input input : inputs.get(serviceName)) { try { + if (input instanceof InputFile) {// apply docker metadata registry + InputFile inputFile = (InputFile) input; + inputFile.setDockerContainerRegistry(dockerContainerRegistry); + } input.init(logFeederProps); if (input.isReady()) { input.monitor(); diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/DockerLogFileUpdateMonitor.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/DockerLogFileUpdateMonitor.java new file mode 100644 index 0000000..0275827 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/DockerLogFileUpdateMonitor.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.input.monitor; + +import org.apache.ambari.logfeeder.docker.DockerContainerRegistry; +import org.apache.ambari.logfeeder.docker.DockerMetadata; +import org.apache.ambari.logfeeder.input.InputFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +/** + * Periodically check docker containers metadata registry, stop monitoring container log files if those do not exist or stopped too long time ago. + * If it finds a new container log for the specific type, it will start to monitoring it. + * <br/> + * Use cases:<br/> + * - input has not monitored yet - found new container -> start monitoring it <br/> + * - input has not monitored yet - found new stopped container -> start monitoring it <br/> + * - input has not monitored yet - found new stopped container but log is too old -> do not monitoring it <br/> + * - input has monitored already - container stopped - if it's stopped for too long time -> remove it from the monitoed list<br/> + * - input has monitored already - container stopped - log is not too old -> keep in the monitored list <br/> + * - input has monitored already - container does not exist - remove it from the monitoed list (and all other input with the same log type) <br/> + */ +public class DockerLogFileUpdateMonitor extends AbstractLogFileMonitor { + + private Logger LOG = LoggerFactory.getLogger(DockerLogFileUpdateMonitor.class); + + public DockerLogFileUpdateMonitor(InputFile inputFile, int waitInterval, int detachTime) { + super(inputFile, waitInterval, detachTime); + } + + @Override + protected String getStartLog() { + return "Start docker component type log files monitor thread for " + getInputFile().getLogType(); + } + + @Override + protected void monitorAndUpdate() throws Exception { + DockerContainerRegistry dockerContainerRegistry = getInputFile().getDockerContainerRegistry(); + Map<String, Map<String, DockerMetadata>> dockerMetadataMapByType = dockerContainerRegistry.getContainerMetadataMap(); + String logType = getInputFile().getLogType(); + Map<String, InputFile> copiedChildMap = new HashMap<>(getInputFile().getInputChildMap()); + + if (dockerMetadataMapByType.containsKey(logType)) { + Map<String, DockerMetadata> dockerMetadataMap = dockerMetadataMapByType.get(logType); + for (Map.Entry<String, DockerMetadata> containerEntry : dockerMetadataMap.entrySet()) { + String logPath = containerEntry.getValue().getLogPath(); + String containerId = containerEntry.getValue().getId(); + long timestamp = containerEntry.getValue().getTimestamp(); + boolean running = containerEntry.getValue().isRunning(); + LOG.debug("Found log path: {} (container id: {})", logPath, containerId); + if (!copiedChildMap.containsKey(logPath)) { + if (!running && isItTooOld(timestamp, new Date().getTime(), getDetachTime())) { + LOG.debug("Container with id {} is stopped, won't monitor as it stopped for long time.", containerId); + } else { + LOG.info("Found new container (id: {}) with new log path: {}", logPath, containerId); + getInputFile().startNewChildDockerInputFileThread(containerEntry.getValue()); + } + } else { + if (!running && isItTooOld(timestamp, new Date().getTime(), getDetachTime())) { + LOG.info("Removing: {}", logPath); + getInputFile().stopChildDockerInputFileThread(containerEntry.getKey()); + } + } + } + } else { + if (!copiedChildMap.isEmpty()) { + LOG.info("Removing all inputs with type: {}", logType); + for (Map.Entry<String, InputFile> inputFileEntry : copiedChildMap.entrySet()) { + LOG.info("Removing: {}", inputFileEntry.getKey()); + getInputFile().stopChildDockerInputFileThread(inputFileEntry.getKey()); + } + } + } + } + + private boolean isItTooOld(long timestamp, long actualTimestamp, long maxDiffMinutes) { + long diff = actualTimestamp - timestamp; + long maxDiffMins = maxDiffMinutes * 1000 * 60; + return diff > maxDiffMins; + } +} diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputFile.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputFile.java index efa56a2..012455e 100644 --- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputFile.java +++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputFile.java @@ -40,6 +40,9 @@ public class LSServerInputFile extends LSServerInputFileBase { @JsonProperty("max_age_min") private Integer maxAgeMin; + @JsonProperty("docker") + private Boolean dockerEnabled; + public LSServerInputFile() {} public LSServerInputFile(InputDescriptor inputDescriptor) { @@ -49,6 +52,7 @@ public class LSServerInputFile extends LSServerInputFileBase { this.detachTimeMin = inputFileDescriptor.getDetachTimeMin(); this.pathUpdateIntervalMin = inputFileDescriptor.getPathUpdateIntervalMin(); this.maxAgeMin = inputFileDescriptor.getMaxAgeMin(); + this.dockerEnabled = inputFileDescriptor.getDockerEnabled(); } public Integer getDetachIntervalMin() { @@ -82,4 +86,12 @@ public class LSServerInputFile extends LSServerInputFileBase { public void setMaxAgeMin(Integer maxAgeMin) { this.maxAgeMin = maxAgeMin; } + + public Boolean getDockerEnabled() { + return dockerEnabled; + } + + public void setDockerEnabled(Boolean dockerEnabled) { + this.dockerEnabled = dockerEnabled; + } } diff --git a/ambari-logsearch/docker/docker-compose.yml b/ambari-logsearch/docker/docker-compose.yml index 99e0f18..b73ee5c 100644 --- a/ambari-logsearch/docker/docker-compose.yml +++ b/ambari-logsearch/docker/docker-compose.yml @@ -47,6 +47,8 @@ services: image: ambari-logsearch:v1.0 restart: always hostname: logsearch.apache.org + labels: + logfeeder.log.type: "logsearch_server" networks: - logsearch-network env_file: @@ -68,6 +70,9 @@ services: image: ambari-logsearch:v1.0 restart: always hostname: logfeeder.apache.org + privileged: true + labels: + logfeeder.log.type: "logfeeder" networks: - logsearch-network env_file: @@ -82,6 +87,9 @@ services: - $AMBARI_LOCATION:/root/ambari - $AMBARI_LOCATION/ambari-logsearch/docker/test-logs:/root/test-logs - $AMBARI_LOCATION/ambari-logsearch/docker/test-config:/root/test-config + - /var/run/docker.sock:/var/run/docker.sock + - /usr/local/bin/docker:/usr/local/bin/docker + - /var/lib/docker:/var/lib/docker networks: logsearch-network: driver: bridge diff --git a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties index 16cbb0f..850aca2 100644 --- a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties +++ b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties @@ -29,4 +29,5 @@ logfeeder.cache.key.field=log_message logfeeder.cache.dedup.interval=1000 logfeeder.cache.last.dedup.enabled=true logsearch.config.zk_connect_string=localhost:9983 -logfeeder.include.default.level=FATAL,ERROR,WARN,INFO,DEBUG,TRACE,UNKNOWN \ No newline at end of file +logfeeder.include.default.level=FATAL,ERROR,WARN,INFO,DEBUG,TRACE,UNKNOWN +logfeeder.docker.registry.enabled=true \ No newline at end of file diff --git a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-logsearch-docker.json b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-logsearch-docker.json new file mode 100644 index 0000000..a420960 --- /dev/null +++ b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-logsearch-docker.json @@ -0,0 +1,31 @@ +{ + "input": [ + { + "type": "logsearch_server", + "rowtype": "service", + "docker": "true" + } + ], + "filter": [ + { + "filter": "grok", + "conditions": { + "fields": { + "type": [ + "logsearch_server" + ] + } + }, + "log4j_format": "", + "multiline_pattern": "^(%{DATESTAMP:logtime})", + "message_pattern": "(?m)^%{DATESTAMP:logtime}%{SPACE}\\[%{DATA:thread_name}\\]%{SPACE}%{LOGLEVEL:level}%{SPACE}%{JAVACLASS}%{SPACE}\\(%{JAVAFILE:file}:%{INT:line_number}\\)%{SPACE}-%{SPACE}%{GREEDYDATA:log_message}", + "post_map_values": { + "logtime": { + "map_date": { + "target_date_pattern":"yyyy-MM-dd HH:mm:ss,SSS" + } + } + } + } + ] +} diff --git a/ambari-logsearch/pom.xml b/ambari-logsearch/pom.xml index 5fbbb33..6d5f9f1 100644 --- a/ambari-logsearch/pom.xml +++ b/ambari-logsearch/pom.xml @@ -33,6 +33,7 @@ <module>ambari-logsearch-config-zookeeper</module> <module>ambari-logsearch-it</module> <module>ambari-logsearch-logfeeder-plugin-api</module> + <module>ambari-logsearch-logfeeder-container-registry</module> </modules> <properties> <jdk.version>1.8</jdk.version>