This is an automated email from the ASF dual-hosted git repository. elek pushed a commit to branch HDDS-2071 in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit d6f95be6d047edc0caa7752fb958e6d61c4694bd Author: Márton Elek <e...@apache.org> AuthorDate: Thu Oct 3 17:06:26 2019 +0200 HDDS-2071. Support filters in ozone insight point --- .../server/OzoneProtocolMessageDispatcher.java | 4 +- .../hadoop/hdds/server/events/EventQueue.java | 4 +- .../hadoop/hdds/scm/node/SCMNodeManager.java | 50 ++++++++------- .../hadoop/ozone/insight/BaseInsightPoint.java | 15 +++++ .../apache/hadoop/ozone/insight/InsightPoint.java | 6 +- .../apache/hadoop/ozone/insight/LogSubcommand.java | 26 ++++++-- .../ozone/insight/datanode/RatisInsight.java | 5 ++ .../hadoop/ozone/insight/TestBaseInsightPoint.java | 73 ++++++++++++++++++++++ 8 files changed, 152 insertions(+), 31 deletions(-) diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java index d67a759..2f4f821 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java @@ -59,7 +59,7 @@ public class OzoneProtocolMessageDispatcher<REQUEST, RESPONSE> { try { if (logger.isTraceEnabled()) { logger.trace( - "{} {} request is received: <json>{}</json>", + "[service={}] [type={}] request is received: <json>{}</json>", serviceName, type.toString(), request.toString().replaceAll("\n", "\\\\n")); @@ -73,7 +73,7 @@ public class OzoneProtocolMessageDispatcher<REQUEST, RESPONSE> { if (logger.isTraceEnabled()) { logger.trace( - "{} {} request is processed. Response: " + "[service={}] [type={}] request is processed. Response: " + "<json>{}</json>", serviceName, type.toString(), diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java index 91e0153..766b3a9 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java @@ -163,12 +163,12 @@ public class EventQueue implements EventPublisher, AutoCloseable { queuedCount.incrementAndGet(); if (LOG.isTraceEnabled()) { LOG.debug( - "Delivering event {} to executor/handler {}: <json>{}</json>", + "Delivering [event={}] to executor/handler {}: <json>{}</json>", event.getName(), executorAndHandlers.getKey().getName(), TRACING_SERIALIZER.toJson(payload).replaceAll("\n", "\\\\n")); } else if (LOG.isDebugEnabled()) { - LOG.debug("Delivering event {} to executor/handler {}: {}", + LOG.debug("Delivering [event={}] to executor/handler {}: {}", event.getName(), executorAndHandlers.getKey().getName(), payload.getClass().getSimpleName()); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index ed65ed3..7f88be4 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -142,7 +142,6 @@ public class SCMNodeManager implements NodeManager { } } - /** * Returns all datanode that are in the given state. This function works by * taking a snapshot of the current collection and then returning the list @@ -154,7 +153,7 @@ public class SCMNodeManager implements NodeManager { @Override public List<DatanodeDetails> getNodes(NodeState nodestate) { return nodeStateManager.getNodes(nodestate).stream() - .map(node -> (DatanodeDetails)node).collect(Collectors.toList()); + .map(node -> (DatanodeDetails) node).collect(Collectors.toList()); } /** @@ -165,7 +164,7 @@ public class SCMNodeManager implements NodeManager { @Override public List<DatanodeDetails> getAllNodes() { return nodeStateManager.getAllNodes().stream() - .map(node -> (DatanodeDetails)node).collect(Collectors.toList()); + .map(node -> (DatanodeDetails) node).collect(Collectors.toList()); } /** @@ -229,11 +228,11 @@ public class SCMNodeManager implements NodeManager { * SCM. * * @param datanodeDetails - Send datanodeDetails with Node info. - * This function generates and assigns new datanode ID - * for the datanode. This allows SCM to be run independent - * of Namenode if required. - * @param nodeReport NodeReport. - * + * This function generates and assigns new datanode ID + * for the datanode. This allows SCM to be run + * independent + * of Namenode if required. + * @param nodeReport NodeReport. * @return SCMHeartbeatResponseProto */ @Override @@ -336,7 +335,7 @@ public class SCMNodeManager implements NodeManager { */ @Override public void processNodeReport(DatanodeDetails datanodeDetails, - NodeReportProto nodeReport) { + NodeReportProto nodeReport) { if (LOG.isDebugEnabled()) { LOG.debug("Processing node report from [datanode={}]", datanodeDetails.getHostName()); @@ -361,6 +360,7 @@ public class SCMNodeManager implements NodeManager { /** * Returns the aggregated node stats. + * * @return the aggregated node stats. */ @Override @@ -379,6 +379,7 @@ public class SCMNodeManager implements NodeManager { /** * Return a map of node stats. + * * @return a map of individual node stats (live/stale but not dead). */ @Override @@ -386,7 +387,7 @@ public class SCMNodeManager implements NodeManager { final Map<DatanodeDetails, SCMNodeStat> nodeStats = new HashMap<>(); - final List<DatanodeInfo> healthyNodes = nodeStateManager + final List<DatanodeInfo> healthyNodes = nodeStateManager .getNodes(NodeState.HEALTHY); final List<DatanodeInfo> staleNodes = nodeStateManager .getNodes(NodeState.STALE); @@ -404,6 +405,7 @@ public class SCMNodeManager implements NodeManager { /** * Return the node stat of the specified datanode. + * * @param datanodeDetails - datanode ID. * @return node stat if it is live/stale, null if it is decommissioned or * doesn't exist. @@ -440,7 +442,7 @@ public class SCMNodeManager implements NodeManager { @Override public Map<String, Integer> getNodeCount() { Map<String, Integer> nodeCountMap = new HashMap<String, Integer>(); - for(NodeState state : NodeState.values()) { + for (NodeState state : NodeState.values()) { nodeCountMap.put(state.toString(), getNodeCount(state)); } return nodeCountMap; @@ -458,7 +460,7 @@ public class SCMNodeManager implements NodeManager { long ssdUsed = 0L; long ssdRemaining = 0L; - List<DatanodeInfo> healthyNodes = nodeStateManager + List<DatanodeInfo> healthyNodes = nodeStateManager .getNodes(NodeState.HEALTHY); List<DatanodeInfo> staleNodes = nodeStateManager .getNodes(NodeState.STALE); @@ -494,9 +496,9 @@ public class SCMNodeManager implements NodeManager { return nodeInfo; } - /** * Get set of pipelines a datanode is part of. + * * @param datanodeDetails - datanodeID * @return Set of PipelineID */ @@ -505,9 +507,9 @@ public class SCMNodeManager implements NodeManager { return nodeStateManager.getPipelineByDnID(datanodeDetails.getUuid()); } - /** * Add pipeline information in the NodeManager. + * * @param pipeline - Pipeline to be added */ @Override @@ -517,6 +519,7 @@ public class SCMNodeManager implements NodeManager { /** * Remove a pipeline information from the NodeManager. + * * @param pipeline - Pipeline to be removed */ @Override @@ -526,17 +529,18 @@ public class SCMNodeManager implements NodeManager { @Override public void addContainer(final DatanodeDetails datanodeDetails, - final ContainerID containerId) + final ContainerID containerId) throws NodeNotFoundException { nodeStateManager.addContainer(datanodeDetails.getUuid(), containerId); } /** * Update set of containers available on a datanode. + * * @param datanodeDetails - DatanodeID - * @param containerIds - Set of containerIDs + * @param containerIds - Set of containerIDs * @throws NodeNotFoundException - if datanode is not known. For new datanode - * use addDatanodeInContainerMap call. + * use addDatanodeInContainerMap call. */ @Override public void setContainers(DatanodeDetails datanodeDetails, @@ -547,6 +551,7 @@ public class SCMNodeManager implements NodeManager { /** * Return set of containerIDs available on a datanode. + * * @param datanodeDetails - DatanodeID * @return - set of containerIDs */ @@ -570,7 +575,7 @@ public class SCMNodeManager implements NodeManager { * DATANODE_COMMAND to the Queue. * * @param commandForDatanode DatanodeCommand - * @param ignored publisher + * @param ignored publisher */ @Override public void onMessage(CommandForDatanode commandForDatanode, @@ -653,6 +658,7 @@ public class SCMNodeManager implements NodeManager { /** * Test utility to stop heartbeat check process. + * * @return ScheduledFuture of next scheduled check that got cancelled. */ @VisibleForTesting @@ -662,6 +668,7 @@ public class SCMNodeManager implements NodeManager { /** * Test utility to resume the paused heartbeat check process. + * * @return ScheduledFuture of the next scheduled check */ @VisibleForTesting @@ -671,6 +678,7 @@ public class SCMNodeManager implements NodeManager { /** * Test utility to get the count of skipped heartbeat check iterations. + * * @return count of skipped heartbeat check iterations */ @VisibleForTesting diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightPoint.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightPoint.java index a23b876..1cc4deb 100644 --- a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightPoint.java +++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightPoint.java @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.HddsUtils; @@ -185,4 +186,18 @@ public abstract class BaseInsightPoint implements InsightPoint { metrics.add(performance); } + @Override + public boolean filterLog(Map<String, String> filters, String logLine) { + if (filters == null) { + return true; + } + boolean result = true; + for (Entry<String, String> entry : filters.entrySet()) { + if (!logLine.matches( + String.format(".*\\[%s=%s\\].*", entry.getKey(), entry.getValue()))) { + result = result & false; + } + } + return result; + } } diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/InsightPoint.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/InsightPoint.java index 1284cfa..57e1ddd 100644 --- a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/InsightPoint.java +++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/InsightPoint.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.insight; import java.util.List; +import java.util.Map; /** * Definition of a specific insight points. @@ -44,6 +45,9 @@ public interface InsightPoint { */ List<Class> getConfigurationClasses(); - + /** + * Decide if the specific log should be displayed or not.. + */ + boolean filterLog(Map<String, String> filters, String logLine); } diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/LogSubcommand.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/LogSubcommand.java index 2e8787f..0a06fe7 100644 --- a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/LogSubcommand.java +++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/LogSubcommand.java @@ -23,8 +23,10 @@ import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; +import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -59,6 +61,10 @@ public class LogSubcommand extends BaseInsightSubCommand + "show more information / detailed message") private boolean verbose; + @CommandLine.Option(names = "-f", description = "Enable verbose mode to " + + "show more information / detailed message") + private Map<String, String> filters; + @Override public Void call() throws Exception { OzoneConfiguration conf = @@ -76,7 +82,8 @@ public class LogSubcommand extends BaseInsightSubCommand Set<Component> sources = loggers.stream().map(LoggerSource::getComponent) .collect(Collectors.toSet()); try { - streamLog(conf, sources, loggers); + streamLog(conf, sources, loggers, + (logLine) -> insight.filterLog(filters, logLine)); } finally { for (LoggerSource logger : loggers) { setLogLevel(conf, logger.getLoggerName(), logger.getComponent(), @@ -86,12 +93,20 @@ public class LogSubcommand extends BaseInsightSubCommand return null; } + /** + * Stream log from multiple endpoint. + * + * @param conf Configuration (to find the log endpoints) + * @param sources Components to connect to (like scm, om...) + * @param relatedLoggers loggers to display + * @param filter any additional filter + */ private void streamLog(OzoneConfiguration conf, Set<Component> sources, - List<LoggerSource> relatedLoggers) { + List<LoggerSource> relatedLoggers, Function<String, Boolean> filter) { List<Thread> loggers = new ArrayList<>(); for (Component sourceComponent : sources) { loggers.add(new Thread( - () -> streamLog(conf, sourceComponent, relatedLoggers))); + () -> streamLog(conf, sourceComponent, relatedLoggers, filter))); } for (Thread thread : loggers) { thread.start(); @@ -106,7 +121,7 @@ public class LogSubcommand extends BaseInsightSubCommand } private void streamLog(OzoneConfiguration conf, Component logComponent, - List<LoggerSource> loggers) { + List<LoggerSource> loggers, Function<String, Boolean> filter) { HttpClient client = HttpClientBuilder.create().build(); HttpGet get = new HttpGet(getHost(conf, logComponent) + "/logstream"); @@ -118,7 +133,8 @@ public class LogSubcommand extends BaseInsightSubCommand bufferedReader.lines() .filter(line -> { for (LoggerSource logger : loggers) { - if (line.contains(logger.getLoggerName())) { + if (line.contains(logger.getLoggerName()) && filter + .apply(line)) { return true; } } diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/datanode/RatisInsight.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/datanode/RatisInsight.java index b87955e..4113067 100644 --- a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/datanode/RatisInsight.java +++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/datanode/RatisInsight.java @@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.insight.datanode; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -72,4 +73,8 @@ public class RatisInsight extends BaseInsightPoint implements InsightPoint { return "More information about one ratis datanode ring."; } + @Override + public boolean filterLog(Map<String, String> filters, String logLine) { + return true; + } } diff --git a/hadoop-ozone/insight/src/test/java/org/apache/hadoop/ozone/insight/TestBaseInsightPoint.java b/hadoop-ozone/insight/src/test/java/org/apache/hadoop/ozone/insight/TestBaseInsightPoint.java new file mode 100644 index 0000000..42fdb39 --- /dev/null +++ b/hadoop-ozone/insight/src/test/java/org/apache/hadoop/ozone/insight/TestBaseInsightPoint.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.insight; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Assert; +import static org.junit.Assert.*; +import org.junit.Test; + +/** + * Test common insight point utility methods. + */ +public class TestBaseInsightPoint { + + @Test + public void filterLog() { + + BaseInsightPoint insightPoint = new BaseInsightPoint() { + @Override + public String getDescription() { + return "test"; + } + }; + + //with simple filter + Map<String, String> filters = new HashMap<>(); + filters.put("datanode", "123"); + + Assert.assertTrue(insightPoint + .filterLog(filters, "This a log specific to [datanode=123]")); + + Assert.assertFalse(insightPoint + .filterLog(filters, "This a log specific to [datanode=234]")); + + //with empty filters + Assert.assertTrue(insightPoint + .filterLog(new HashMap<>(), "This a log specific to [datanode=234]")); + + //with multiple filters + filters.clear(); + filters.put("datanode", "123"); + filters.put("pipeline", "abcd"); + + Assert.assertFalse(insightPoint + .filterLog(filters, "This a log specific to [datanode=123]")); + + Assert.assertTrue(insightPoint + .filterLog(filters, + "This a log specific to [datanode=123] [pipeline=abcd]")); + + Assert.assertFalse(insightPoint + .filterLog(filters, + "This a log specific to [datanode=456] [pipeline=abcd]")); + + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-commits-h...@hadoop.apache.org