http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogCleaner.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogCleaner.java b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogCleaner.java deleted file mode 100644 index 6aa1050..0000000 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogCleaner.java +++ /dev/null @@ -1,296 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.daemon.wip.logviewer.utils; - -import org.apache.commons.io.IOUtils; -import org.apache.storm.StormTimer; -import org.apache.storm.daemon.DirectoryCleaner; -import org.apache.storm.daemon.supervisor.SupervisorUtils; -import org.apache.storm.utils.ConfigUtils; -import org.apache.storm.utils.ObjectReader; -import org.apache.storm.utils.Time; -import org.apache.storm.utils.Utils; -import org.jooq.lambda.Unchecked; -import org.jooq.lambda.tuple.Tuple2; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.io.File; -import java.io.FileFilter; -import java.io.IOException; -import java.nio.file.DirectoryStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.function.BinaryOperator; -import java.util.stream.StreamSupport; - -import static java.util.stream.Collectors.joining; -import static java.util.stream.Collectors.toCollection; -import static java.util.stream.Collectors.toMap; -import static org.apache.storm.DaemonConfig.LOGVIEWER_CLEANUP_AGE_MINS; -import static org.apache.storm.DaemonConfig.LOGVIEWER_CLEANUP_INTERVAL_SECS; -import static org.apache.storm.DaemonConfig.LOGVIEWER_MAX_PER_WORKER_LOGS_SIZE_MB; -import static org.apache.storm.DaemonConfig.LOGVIEWER_MAX_SUM_WORKER_LOGS_SIZE_MB; - -public class LogCleaner implements Runnable, Closeable { - private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class); - public static final String WORKER_YAML = "worker.yaml"; - - private final Map<String, Object> stormConf; - private final Integer intervalSecs; - private final String logRootDir; - private StormTimer logviewerCleanupTimer; - private final long maxSumWorkerLogsSizeMb; - private long maxPerWorkerLogsSizeMb; - - public LogCleaner(Map<String, Object> stormConf) { - String logRootDir = ConfigUtils.workerArtifactsRoot(stormConf); - - this.stormConf = stormConf; - this.intervalSecs = ObjectReader.getInt(stormConf.get(LOGVIEWER_CLEANUP_INTERVAL_SECS), null); - this.logRootDir = logRootDir; - - maxSumWorkerLogsSizeMb = ObjectReader.getInt(stormConf.get(LOGVIEWER_MAX_SUM_WORKER_LOGS_SIZE_MB)); - maxPerWorkerLogsSizeMb = ObjectReader.getInt(stormConf.get(LOGVIEWER_MAX_PER_WORKER_LOGS_SIZE_MB)); - maxPerWorkerLogsSizeMb = Math.min(maxPerWorkerLogsSizeMb, (long) (maxSumWorkerLogsSizeMb * 0.5)); - - LOG.info("configured max total size of worker logs: {} MB, max total size of worker logs per directory: {} MB", - maxSumWorkerLogsSizeMb, maxPerWorkerLogsSizeMb); - } - - public void start() { - if (intervalSecs != null) { - LOG.debug("starting log cleanup thread at interval: {}", intervalSecs); - - logviewerCleanupTimer = new StormTimer("logviewer-cleanup", (t, e) -> { - LOG.error("Error when doing logs cleanup", e); - Utils.exitProcess(20, "Error when doing log cleanup"); - }); - - logviewerCleanupTimer.scheduleRecurring(0, intervalSecs, this); - } else { - LOG.warn("The interval for log cleanup is not set. Skip starting log cleanup thread."); - } - - } - - public void close() { - if (logviewerCleanupTimer != null) { - try { - logviewerCleanupTimer.close(); - } catch (Exception ex) { - throw Utils.wrapInRuntime(ex); - } - } - } - - /** - * Delete old log dirs for which the workers are no longer alive. - */ - public void run() { - try { - int nowSecs = Time.currentTimeSecs(); - Set<File> oldLogDirs = selectDirsForCleanup(nowSecs * 1000, logRootDir); - - DirectoryCleaner cleaner = new DirectoryCleaner(); - SortedSet<File> deadWorkerDirs = getDeadWorkerDirs(nowSecs, oldLogDirs); - - LOG.debug("log cleanup: now={} old log dirs {} dead worker dirs {}", nowSecs, - oldLogDirs.stream().map(File::getName).collect(joining(",")), - deadWorkerDirs.stream().map(File::getName).collect(joining(","))); - - deadWorkerDirs.forEach(Unchecked.consumer(dir -> { - String path = dir.getCanonicalPath(); - LOG.info("Cleaning up: Removing {}", path); - - try { - Utils.forceDelete(path); - cleanupEmptyTopoDirectory(dir); - } catch (Exception ex) { - LOG.error(ex.getMessage(), ex); - } - })); - - perWorkerDirCleanup(new File(logRootDir), maxPerWorkerLogsSizeMb * 1024 * 1024, cleaner); - globalLogCleanup(new File(logRootDir), maxSumWorkerLogsSizeMb * 1024 * 1024, cleaner); - } catch (Exception ex) { - LOG.error("Exception while cleaning up old log.", ex); - } - } - - /** - * Delete the oldest files in each overloaded worker log dir. - */ - private void perWorkerDirCleanup(File rootDir, long size, DirectoryCleaner cleaner) { - WorkerLogs.getAllWorkerDirs(rootDir).forEach(Unchecked.consumer(dir -> { - cleaner.deleteOldestWhileTooLarge(Collections.singletonList(dir), size, true, null); - })); - } - - /** - * Delete the oldest files in overloaded worker-artifacts globally. - */ - private void globalLogCleanup(File rootDir, long size, DirectoryCleaner cleaner) throws Exception { - List<File> workerDirs = new ArrayList<>(WorkerLogs.getAllWorkerDirs(rootDir)); - Set<String> aliveWorkerDirs = new HashSet<>(getAliveWorkerDirs(rootDir)); - - cleaner.deleteOldestWhileTooLarge(workerDirs, size, false, aliveWorkerDirs); - } - - /** - * Return a sorted set of java.io.Files that were written by workers that are now active. - */ - private SortedSet<String> getAliveWorkerDirs(File rootDir) throws Exception { - Set<String> aliveIds = getAliveIds(Time.currentTimeSecs()); - Set<File> logDirs = WorkerLogs.getAllWorkerDirs(rootDir); - Map<String, File> idToDir = identifyWorkerLogDirs(logDirs); - - return idToDir.entrySet().stream() - .filter(entry -> aliveIds.contains(entry.getKey())) - .map(Unchecked.function(entry -> entry.getValue().getCanonicalPath())) - .collect(toCollection(TreeSet::new)); - } - - private Map<String, File> identifyWorkerLogDirs(Set<File> logDirs) { - return logDirs.stream().map(Unchecked.function(logDir -> { - Optional<File> metaFile = getMetadataFileForWorkerLogDir(logDir); - - return metaFile.map(Unchecked.function(m -> new Tuple2<>(getWorkerIdFromMetadataFile(m.getCanonicalPath()), logDir))) - .orElse(new Tuple2<>("", logDir)); - })).collect(toMap(Tuple2::v1, Tuple2::v2)); - } - - private Optional<File> getMetadataFileForWorkerLogDir(File logDir) throws IOException { - File metaFile = new File(logDir, WORKER_YAML); - if (metaFile.exists()) { - return Optional.of(metaFile); - } else { - LOG.warn("Could not find {} to clean up for {}", metaFile.getCanonicalPath(), logDir); - return Optional.empty(); - } - } - - private String getWorkerIdFromMetadataFile(String metaFile) { - Map<String, Object> map = (Map<String, Object>) Utils.readYamlFile(metaFile); - return ObjectReader.getString(map.get("worker-id"), null); - } - - private Set<String> getAliveIds(int nowSecs) throws Exception { - return SupervisorUtils.readWorkerHeartbeats(stormConf).entrySet().stream() - .filter(entry -> Objects.nonNull(entry.getValue()) - && !SupervisorUtils.isWorkerHbTimedOut(nowSecs, entry.getValue(), stormConf)) - .map(Map.Entry::getKey) - .collect(toCollection(TreeSet::new)); - } - - /** - * Delete the topo dir if it contains zero port dirs. - */ - private void cleanupEmptyTopoDirectory(File dir) throws IOException { - File topoDir = dir.getParentFile(); - if (topoDir.listFiles().length == 0) { - Utils.forceDelete(topoDir.getCanonicalPath()); - } - } - - /** - * Return a sorted set of java.io.Files that were written by workers that are now dead. - */ - private SortedSet<File> getDeadWorkerDirs(int nowSecs, Set<File> logDirs) throws Exception { - if (logDirs.isEmpty()) { - return new TreeSet<>(); - } else { - Set<String> aliveIds = getAliveIds(nowSecs); - Map<String, File> idToDir = identifyWorkerLogDirs(logDirs); - - return idToDir.entrySet().stream() - .filter(entry -> !aliveIds.contains(entry.getKey())) - .map(Map.Entry::getValue) - .collect(toCollection(TreeSet::new)); - } - } - - private Set<File> selectDirsForCleanup(int nowMillis, String rootDir) { - FileFilter fileFilter = mkFileFilterForLogCleanup(nowMillis); - - return Arrays.stream(new File(rootDir).listFiles()) - .flatMap(topoDir -> Arrays.stream(topoDir.listFiles(fileFilter))) - .collect(toCollection(TreeSet::new)); - } - - private FileFilter mkFileFilterForLogCleanup(int nowMillis) { - final int cutoffAgeMillis = cleanupCutoffAgeMillis(nowMillis); - return file -> !file.isFile() && lastModifiedTimeWorkerLogdir(file) <= cutoffAgeMillis; - } - - /** - * Return the last modified time for all log files in a worker's log dir. - * Using stream rather than File.listFiles is to avoid large mem usage - * when a directory has too many files. - */ - private long lastModifiedTimeWorkerLogdir(File logDir) { - Optional<DirectoryStream<Path>> dirStreamOptional = getStreamForDir(logDir); - long dirModified = logDir.lastModified(); - - if (!dirStreamOptional.isPresent()) { - return dirModified; - } - - DirectoryStream<Path> dirStream = dirStreamOptional.get(); - try { - return StreamSupport.stream(dirStream.spliterator(), false) - .reduce(dirModified, (maximum, path) -> { - long curr = path.toFile().lastModified(); - return curr > maximum ? curr : maximum; - }, BinaryOperator.maxBy(Long::compareTo)); - } catch (Exception ex) { - LOG.error(ex.getMessage(), ex); - return dirModified; - } finally { - if (DirectoryStream.class.isInstance(dirStream)) { - IOUtils.closeQuietly(dirStream); - } - } - } - - private Optional<DirectoryStream<Path>> getStreamForDir(File file) { - try { - return Optional.of(Files.newDirectoryStream(file.toPath())); - } catch (Exception ex) { - LOG.error(ex.getMessage(), ex); - return Optional.empty(); - } - } - - private int cleanupCutoffAgeMillis(int nowMillis) { - return nowMillis - (ObjectReader.getInt(stormConf.get(LOGVIEWER_CLEANUP_AGE_MINS))); - } -}
http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogFileDownloader.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogFileDownloader.java b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogFileDownloader.java deleted file mode 100644 index dcb2e2c..0000000 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogFileDownloader.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.daemon.wip.logviewer.utils; - -import javax.ws.rs.core.Response; -import java.io.File; -import java.io.IOException; - -public class LogFileDownloader { - - private final String logRoot; - private final String daemonLogRoot; - private final ResourceAuthorizer resourceAuthorizer; - - public LogFileDownloader(String logRoot, String daemonLogRoot, ResourceAuthorizer resourceAuthorizer) { - this.logRoot = logRoot; - this.daemonLogRoot = daemonLogRoot; - this.resourceAuthorizer = resourceAuthorizer; - } - - public Response downloadFile(String fileName, String user, boolean isDaemon) throws IOException { - String rootDir = isDaemon ? daemonLogRoot : logRoot; - File file = new File(rootDir, fileName).getCanonicalFile(); - if (file.exists()) { - if (isDaemon || resourceAuthorizer.isUserAllowedToAccessFile(fileName, user)) { - return LogviewerResponseBuilder.buildDownloadFile(file); - } else { - return LogviewerResponseBuilder.buildResponseUnautohrizedUser(user); - } - } else { - return LogviewerResponseBuilder.buildResponsePageNotFound(); - } - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogviewerResponseBuilder.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogviewerResponseBuilder.java b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogviewerResponseBuilder.java deleted file mode 100644 index bc60e09..0000000 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/LogviewerResponseBuilder.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.daemon.wip.logviewer.utils; - -import com.google.common.io.ByteStreams; -import org.apache.storm.daemon.common.JsonResponseBuilder; -import org.apache.storm.ui.UIHelpers; - -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.StreamingOutput; - -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.HashMap; -import java.util.Map; - -import static j2html.TagCreator.body; -import static j2html.TagCreator.h2; -import static javax.ws.rs.core.Response.Status.OK; -import static org.apache.commons.lang.StringEscapeUtils.escapeHtml; - -public class LogviewerResponseBuilder { - - private LogviewerResponseBuilder() { - } - - public static Response buildSuccessHtmlResponse(String content) { - return Response.status(OK).entity(content) - .type(MediaType.TEXT_HTML_TYPE).build(); - } - - public static Response buildSuccessJsonResponse(Object entity, String callback, String origin) { - return new JsonResponseBuilder().setData(entity).setCallback(callback) - .setHeaders(LogviewerResponseBuilder.getHeadersForSuccessResponse(origin)).build(); - } - - public static Response buildDownloadFile(File file) throws IOException { - // do not close this InputStream in method: it will be used from jetty server - InputStream is = new FileInputStream(file); - return Response.status(OK) - .entity(wrapWithStreamingOutput(is)) - .type(MediaType.APPLICATION_OCTET_STREAM_TYPE) - .header("Content-Disposition", "attachment; filename=\"" + file.getName() + "\"") - .build(); - } - - public static Response buildResponseUnautohrizedUser(String user) { - String entity = buildUnauthorizedUserHtml(user); - return Response.status(OK) - .entity(entity) - .type(MediaType.TEXT_HTML_TYPE) - .build(); - } - - public static Response buildResponsePageNotFound() { - return Response.status(404) - .entity("Page not found") - .type(MediaType.TEXT_HTML_TYPE) - .build(); - } - - public static Response buildUnauthorizedUserJsonResponse(String user, String callback) { - return new JsonResponseBuilder().setData(UIHelpers.unauthorizedUserJson(user)) - .setCallback(callback).setStatus(401).build(); - } - - public static Response buildExceptionJsonResponse(Exception ex, String callback) { - return new JsonResponseBuilder().setData(UIHelpers.exceptionToJson(ex)) - .setCallback(callback).setStatus(500).build(); - } - - private static Map<String, Object> getHeadersForSuccessResponse(String origin) { - Map<String, Object> headers = new HashMap<>(); - headers.put("Access-Control-Allow-Origin", origin); - headers.put("Access-Control-Allow-Credentials", "true"); - return headers; - } - - private static String buildUnauthorizedUserHtml(String user) { - String content = "User '" + escapeHtml(user) + "' is not authorized."; - return body(h2(content)).render(); - } - - private static StreamingOutput wrapWithStreamingOutput(final InputStream inputStream) { - return os -> { - OutputStream wrappedOutputStream = os; - if (!(os instanceof BufferedOutputStream)) { - wrappedOutputStream = new BufferedOutputStream(os); - } - - ByteStreams.copy(inputStream, wrappedOutputStream); - - wrappedOutputStream.flush(); - }; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/ResourceAuthorizer.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/ResourceAuthorizer.java b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/ResourceAuthorizer.java deleted file mode 100644 index c7a45e7..0000000 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/ResourceAuthorizer.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.daemon.wip.logviewer.utils; - -import com.google.common.collect.Sets; -import org.apache.commons.lang.StringUtils; -import org.apache.storm.Config; -import org.apache.storm.DaemonConfig; -import org.apache.storm.security.auth.AuthUtils; -import org.apache.storm.security.auth.IGroupMappingServiceProvider; -import org.apache.storm.utils.ObjectReader; -import org.apache.storm.utils.ServerConfigUtils; -import org.apache.storm.utils.Utils; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static org.apache.storm.DaemonConfig.UI_FILTER; - -public class ResourceAuthorizer { - - private final Map<String, Object> stormConf; - private final IGroupMappingServiceProvider groupMappingServiceProvider; - - public ResourceAuthorizer(Map<String, Object> stormConf) { - this.stormConf = stormConf; - this.groupMappingServiceProvider = AuthUtils.GetGroupMappingServiceProviderPlugin(stormConf); - } - - public boolean isUserAllowedToAccessFile(String fileName, String user) { - return isUiFilterNotSet() || isAuthorizedLogUser(user, fileName); - } - - private boolean isUiFilterNotSet() { - return StringUtils.isBlank(ObjectReader.getString(stormConf.get(UI_FILTER), null)); - } - - private boolean isAuthorizedLogUser(String user, String fileName) { - if (StringUtils.isEmpty(user) || StringUtils.isEmpty(fileName) - || getLogUserGroupWhitelist(fileName) == null) { - return false; - } else { - Set<String> groups = getUserGroups(user); - LogUserGroupWhitelist whitelist = getLogUserGroupWhitelist(fileName); - - List<String> logsUsers = new ArrayList<>(); - logsUsers.addAll(ObjectReader.getStrings(stormConf.get(DaemonConfig.LOGS_USERS))); - logsUsers.addAll(ObjectReader.getStrings(stormConf.get(Config.NIMBUS_ADMINS))); - logsUsers.addAll(whitelist.getUserWhitelist()); - - List<String> logsGroups = new ArrayList<>(); - logsGroups.addAll(ObjectReader.getStrings(stormConf.get(DaemonConfig.LOGS_GROUPS))); - logsGroups.addAll(whitelist.getGroupWhitelist()); - - return logsUsers.stream().anyMatch(u -> u.equals(user)) || - Sets.intersection(groups, new HashSet<>(logsGroups)).size() < 0; - - } - } - - public LogUserGroupWhitelist getLogUserGroupWhitelist(String fileName) { - File wlFile = ServerConfigUtils.getLogMetaDataFile(fileName); - Map<String, Object> map = (Map<String, Object>) Utils.readYamlFile(wlFile.getAbsolutePath()); - - if (map == null) { - return null; - } - - List<String> logsUsers = ObjectReader.getStrings(map.get(DaemonConfig.LOGS_USERS)); - List<String> logsGroups = ObjectReader.getStrings(map.get(DaemonConfig.LOGS_GROUPS)); - return new LogUserGroupWhitelist( - logsUsers.isEmpty() ? new HashSet<>() : new HashSet<>(logsUsers), - logsGroups.isEmpty() ? new HashSet<>() : new HashSet<>(logsGroups) - ); - } - - private Set<String> getUserGroups(String user) { - try { - if (StringUtils.isEmpty(user)) { - return new HashSet<>(); - } else { - return groupMappingServiceProvider.getGroups(user); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public static class LogUserGroupWhitelist { - - private Set<String> userWhitelist; - private Set<String> groupWhitelist; - - public LogUserGroupWhitelist(Set<String> userWhitelist, Set<String> groupWhitelist) { - this.userWhitelist = userWhitelist; - this.groupWhitelist = groupWhitelist; - } - - public Set<String> getUserWhitelist() { - return userWhitelist; - } - - public Set<String> getGroupWhitelist() { - return groupWhitelist; - } - - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/WorkerLogs.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/WorkerLogs.java b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/WorkerLogs.java deleted file mode 100644 index c49a4a1..0000000 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/utils/WorkerLogs.java +++ /dev/null @@ -1,63 +0,0 @@ -package org.apache.storm.daemon.wip.logviewer.utils; - -import org.apache.storm.daemon.DirectoryCleaner; -import org.apache.storm.utils.Utils; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.stream.Stream; - -import static java.util.stream.Collectors.toCollection; -import static org.apache.storm.daemon.utils.ListFunctionalSupport.takeLast; - -public class WorkerLogs { - - private WorkerLogs() { - } - - /** - * Return the path of the worker log with the format of topoId/port/worker.log.* - */ - public static String getTopologyPortWorkerLog(File file) { - try { - String[] splitted = file.getCanonicalPath().split(Utils.FILE_PATH_SEPARATOR); - List<String> split = takeLast(Arrays.asList(splitted), 3); - - return String.join(Utils.FILE_PATH_SEPARATOR, split); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public static List<File> getAllLogsForRootDir(File logDir) throws IOException { - List<File> files = new ArrayList<>(); - Set<File> topoDirFiles = getAllWorkerDirs(logDir); - if (topoDirFiles != null) { - for (File portDir : topoDirFiles) { - files.addAll(DirectoryCleaner.getFilesForDir(portDir)); - } - } - - return files; - } - - public static Set<File> getAllWorkerDirs(File rootDir) { - File[] rootDirFiles = rootDir.listFiles(); - if (rootDirFiles != null) { - return Arrays.stream(rootDirFiles).flatMap(topoDir -> { - File[] topoFiles = topoDir.listFiles(); - return topoFiles != null ? Arrays.stream(topoFiles) : Stream.empty(); - }).collect(toCollection(TreeSet::new)); - } - - return new TreeSet<>(); - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/webapp/LogviewerApplication.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/webapp/LogviewerApplication.java b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/webapp/LogviewerApplication.java deleted file mode 100644 index c1b4bc7..0000000 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/webapp/LogviewerApplication.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.daemon.wip.logviewer.webapp; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.core.Appender; -import org.apache.logging.log4j.core.LoggerContext; -import org.apache.logging.log4j.core.appender.RollingFileAppender; -import org.apache.storm.daemon.common.AuthorizationExceptionMapper; -import org.apache.storm.daemon.wip.logviewer.handler.LogviewerLogDownloadHandler; -import org.apache.storm.daemon.wip.logviewer.handler.LogviewerLogPageHandler; -import org.apache.storm.daemon.wip.logviewer.handler.LogviewerLogSearchHandler; -import org.apache.storm.daemon.wip.logviewer.handler.LogviewerProfileHandler; -import org.apache.storm.daemon.wip.logviewer.utils.ResourceAuthorizer; -import org.apache.storm.security.auth.AuthUtils; -import org.apache.storm.security.auth.IHttpCredentialsPlugin; -import org.apache.storm.utils.ConfigUtils; -import org.apache.storm.utils.ObjectReader; - -import javax.ws.rs.ApplicationPath; -import javax.ws.rs.core.Application; -import java.io.File; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import static org.apache.storm.DaemonConfig.LOGVIEWER_APPENDER_NAME; - -@ApplicationPath("") -public class LogviewerApplication extends Application { - private static Map<String, Object> stormConf; - private final Set<Object> singletons = new HashSet<Object>(); - - /** - * Constructor. - */ - public LogviewerApplication() { - String logRoot = ConfigUtils.workerArtifactsRoot(stormConf); - String daemonLogRoot = logRootDir(ObjectReader.getString(stormConf.get(LOGVIEWER_APPENDER_NAME))); - - ResourceAuthorizer resourceAuthorizer = new ResourceAuthorizer(stormConf); - - LogviewerLogPageHandler logviewer = new LogviewerLogPageHandler(logRoot, daemonLogRoot, resourceAuthorizer); - LogviewerProfileHandler profileHandler = new LogviewerProfileHandler(logRoot, resourceAuthorizer); - LogviewerLogDownloadHandler logDownloadHandler = new LogviewerLogDownloadHandler(logRoot, daemonLogRoot, - resourceAuthorizer); - LogviewerLogSearchHandler logSearchHandler = new LogviewerLogSearchHandler(stormConf, logRoot, daemonLogRoot, - resourceAuthorizer); - IHttpCredentialsPlugin httpCredsHandler = AuthUtils.GetUiHttpCredentialsPlugin(stormConf); - - singletons.add(new LogviewerResource(logviewer, profileHandler, logDownloadHandler, logSearchHandler, httpCredsHandler)); - singletons.add(new AuthorizationExceptionMapper()); - } - - @Override - public Set<Object> getSingletons() { - return singletons; - } - - public static void setup(Map<String, Object> stormConf) { - LogviewerApplication.stormConf = stormConf; - } - - /** - * Given an appender name, as configured, get the parent directory of the appender's log file. - * Note that if anything goes wrong, this will throw an Error and exit. - */ - private String logRootDir(String appenderName) { - Appender appender = ((LoggerContext) LogManager.getContext()).getConfiguration().getAppender(appenderName); - if (appenderName != null && appender != null && RollingFileAppender.class.isInstance(appender)) { - return new File(((RollingFileAppender) appender).getFileName()).getParent(); - } else { - throw new RuntimeException("Log viewer could not find configured appender, or the appender is not a FileAppender. " - + "Please check that the appender name configured in storm and log4j agree."); - } - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/webapp/LogviewerResource.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/webapp/LogviewerResource.java b/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/webapp/LogviewerResource.java deleted file mode 100644 index fd8ea97..0000000 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/wip/logviewer/webapp/LogviewerResource.java +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.daemon.wip.logviewer.webapp; - -import com.codahale.metrics.Meter; -import org.apache.commons.lang.BooleanUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.storm.daemon.common.JsonResponseBuilder; -import org.apache.storm.daemon.wip.logviewer.handler.LogviewerLogDownloadHandler; -import org.apache.storm.daemon.wip.logviewer.handler.LogviewerLogPageHandler; -import org.apache.storm.daemon.wip.logviewer.handler.LogviewerLogSearchHandler; -import org.apache.storm.daemon.wip.logviewer.handler.LogviewerProfileHandler; -import org.apache.storm.metric.StormMetricsRegistry; -import org.apache.storm.security.auth.IHttpCredentialsPlugin; -import org.apache.storm.ui.InvalidRequestException; -import org.apache.storm.ui.UIHelpers; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.GET; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.Response; -import java.io.IOException; -import java.net.URLDecoder; -import java.util.Map; - -@Path("/") -public class LogviewerResource { - private static final Logger LOG = LoggerFactory.getLogger(LogviewerResource.class); - - private static final Meter meterLogPageHttpRequests = StormMetricsRegistry.registerMeter("logviewer:num-log-page-http-requests"); - private static final Meter meterDaemonLogPageHttpRequests = StormMetricsRegistry.registerMeter("logviewer:num-daemonlog-page-http-requests"); - private static final Meter meterDownloadLogFileHttpRequests = StormMetricsRegistry.registerMeter("logviewer:num-download-log-file-http-requests"); - private static final Meter meterDownloadLogDaemonFileHttpRequests = StormMetricsRegistry.registerMeter("logviewer:num-download-log-daemon-file-http-requests"); - private static final Meter meterListLogsHttpRequests = StormMetricsRegistry.registerMeter("logviewer:num-list-logs-http-requests"); - - private final LogviewerLogPageHandler logviewer; - private final LogviewerProfileHandler profileHandler; - private final LogviewerLogDownloadHandler logDownloadHandler; - private final LogviewerLogSearchHandler logSearchHandler; - private final IHttpCredentialsPlugin httpCredsHandler; - - public LogviewerResource(LogviewerLogPageHandler logviewerParam, LogviewerProfileHandler profileHandler, - LogviewerLogDownloadHandler logDownloadHandler, LogviewerLogSearchHandler logSearchHandler, - IHttpCredentialsPlugin httpCredsHandler) { - this.logviewer = logviewerParam; - this.profileHandler = profileHandler; - this.logDownloadHandler = logDownloadHandler; - this.logSearchHandler = logSearchHandler; - this.httpCredsHandler = httpCredsHandler; - } - - @GET - @Path("/log") - public Response log(@Context HttpServletRequest request) throws IOException { - meterLogPageHttpRequests.mark(); - - try { - String user = httpCredsHandler.getUserName(request); - Integer start = request.getParameter("start") != null ? parseIntegerFromMap(request.getParameterMap(), "start") : null; - Integer length = request.getParameter("length") != null ? parseIntegerFromMap(request.getParameterMap(), "length") : null; - String decodedFileName = URLDecoder.decode(request.getParameter("file")); - String grep = request.getParameter("grep"); - return logviewer.logPage(decodedFileName, start, length, grep, user); - } catch (InvalidRequestException e) { - LOG.error(e.getMessage(), e); - return Response.status(400).entity(e.getMessage()).build(); - } - } - - @GET - @Path("/daemonlog") - public Response daemonLog(@Context HttpServletRequest request) throws IOException { - meterDaemonLogPageHttpRequests.mark(); - - try { - String user = httpCredsHandler.getUserName(request); - Integer start = request.getParameter("start") != null ? parseIntegerFromMap(request.getParameterMap(), "start") : null; - Integer length = request.getParameter("length") != null ? parseIntegerFromMap(request.getParameterMap(), "length") : null; - String decodedFileName = URLDecoder.decode(request.getParameter("file")); - String grep = request.getParameter("grep"); - return logviewer.daemonLogPage(decodedFileName, start, length, grep, user); - } catch (InvalidRequestException e) { - LOG.error(e.getMessage(), e); - return Response.status(400).entity(e.getMessage()).build(); - } - } - - @GET - @Path("/searchLogs") - public Response searchLogs(@Context HttpServletRequest request) throws IOException { - String user = httpCredsHandler.getUserName(request); - String topologyId = request.getParameter("topoId"); - String portStr = request.getParameter("port"); - String callback = request.getParameter("callback"); - String origin = request.getHeader("Origin"); - - return logviewer.listLogFiles(user, portStr != null ? Integer.parseInt(portStr) : null, topologyId, callback, origin); - } - - @GET - @Path("/listLogs") - public Response listLogs(@Context HttpServletRequest request) throws IOException { - meterListLogsHttpRequests.mark(); - - String user = httpCredsHandler.getUserName(request); - String topologyId = request.getParameter("topoId"); - String portStr = request.getParameter("port"); - String callback = request.getParameter("callback"); - String origin = request.getHeader("Origin"); - - return logviewer.listLogFiles(user, portStr != null ? Integer.parseInt(portStr) : null, topologyId, callback, origin); - } - - @GET - @Path("/dumps/{topo-id}/{host-port}") - public Response listDumpFiles(@PathParam("topo-id") String topologyId, @PathParam("host-port") String hostPort, - @Context HttpServletRequest request) throws IOException { - String user = httpCredsHandler.getUserName(request); - return profileHandler.listDumpFiles(topologyId, hostPort, user); - } - - @GET - @Path("/dumps/{topo-id}/{host-port}/{filename}") - public Response downloadDumpFile(@PathParam("topo-id") String topologyId, @PathParam("host-port") String hostPort, - @PathParam("filename") String fileName, @Context HttpServletRequest request) throws IOException { - String user = httpCredsHandler.getUserName(request); - return profileHandler.downloadDumpFile(topologyId, hostPort, fileName, user); - } - - @GET - @Path("/download") - public Response downloadLogFile(@Context HttpServletRequest request) throws IOException { - meterDownloadLogFileHttpRequests.mark(); - - String user = httpCredsHandler.getUserName(request); - String file = request.getParameter("file"); - String decodedFileName = URLDecoder.decode(file); - return logDownloadHandler.downloadLogFile(decodedFileName, user); - } - - @GET - @Path("/daemondownload") - public Response downloadDaemonLogFile(@Context HttpServletRequest request) throws IOException { - meterDownloadLogDaemonFileHttpRequests.mark(); - - String user = httpCredsHandler.getUserName(request); - String file = request.getParameter("file"); - String decodedFileName = URLDecoder.decode(file); - return logDownloadHandler.downloadDaemonLogFile(decodedFileName, user); - } - - @GET - @Path("/search/{file}") - public Response search(@PathParam("file") String file, @Context HttpServletRequest request) throws IOException { - String user = httpCredsHandler.getUserName(request); - boolean isDaemon = StringUtils.equals(request.getParameter("is-daemon"), "yes"); - String decodedFileName = URLDecoder.decode(file); - String searchString = request.getParameter("search-string"); - String numMatchesStr = request.getParameter("num-matches"); - String startByteOffset = request.getParameter("start-byte-offset"); - String callback = request.getParameter("callback"); - String origin = request.getHeader("Origin"); - - try { - return logSearchHandler.searchLogFile(decodedFileName, user, isDaemon, searchString, numMatchesStr, - startByteOffset, callback, origin); - } catch (InvalidRequestException e) { - LOG.error(e.getMessage(), e); - return new JsonResponseBuilder().setData(UIHelpers.exceptionToJson(e)).setCallback(callback) - .setStatus(400).build(); - } - } - - @GET - @Path("/deepSearch/{topoId}") - public Response deepSearch(@PathParam("topoId") String topologyId, - @Context HttpServletRequest request) throws IOException { - String user = httpCredsHandler.getUserName(request); - String searchString = request.getParameter("search-string"); - String numMatchesStr = request.getParameter("num-matches"); - String portStr = request.getParameter("port"); - String startFileOffset = request.getParameter("start-file-offset"); - String startByteOffset = request.getParameter("start-byte-offset"); - String searchArchived = request.getParameter("search-archived"); - String callback = request.getParameter("callback"); - String origin = request.getHeader("Origin"); - - return logSearchHandler.deepSearchLogsForTopology(topologyId, user, searchString, numMatchesStr, portStr, - startFileOffset, startByteOffset, BooleanUtils.toBooleanObject(searchArchived), callback, origin); - } - - private int parseIntegerFromMap(Map map, String parameterKey) throws InvalidRequestException { - try { - return Integer.parseInt(((String[]) map.get(parameterKey))[0]); - } catch (NumberFormatException ex) { - throw new InvalidRequestException("Could not make an integer out of the query parameter '" - + parameterKey + "'", ex); - } - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/LogviewerTest.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/LogviewerTest.java b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/LogviewerTest.java new file mode 100644 index 0000000..6abe153 --- /dev/null +++ b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/LogviewerTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.logviewer; + +import java.io.File; +import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Path; +import java.util.Iterator; +import java.util.List; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class LogviewerTest { + private Path mkMockPath(File file) { + Path mockPath = mock(Path.class); + when(mockPath.toFile()).thenReturn(file); + return mockPath; + } + + private DirectoryStream<Path> mkDirectoryStream(List<Path> listOfPaths) { + return new DirectoryStream<Path>() { + @Override + public Iterator<Path> iterator() { + return listOfPaths.iterator(); + } + + @Override + public void close() throws IOException { + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/44b268ba/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandlerTest.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandlerTest.java b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandlerTest.java new file mode 100644 index 0000000..4b76abd --- /dev/null +++ b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandlerTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.logviewer.handler; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.storm.daemon.logviewer.utils.LogviewerResponseBuilder; +import org.apache.storm.daemon.logviewer.utils.ResourceAuthorizer; +import org.apache.storm.utils.Utils; +import org.assertj.core.util.Lists; +import org.junit.Test; + +import javax.ws.rs.core.Response; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.attribute.FileAttribute; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class LogviewerLogPageHandlerTest { + + /** + * list-log-files filter selects the correct log files to return. + */ + @Test + public void testListLogFiles() throws IOException { + FileAttribute[] attrs = new FileAttribute[0]; + String rootPath = Files.createTempDirectory("workers-artifacts", attrs).toFile().getCanonicalPath(); + File file1 = new File(String.join(File.separator, rootPath, "topoA", "1111"), "worker.log"); + File file2 = new File(String.join(File.separator, rootPath, "topoA", "2222"), "worker.log"); + File file3 = new File(String.join(File.separator, rootPath, "topoB", "1111"), "worker.log"); + + file1.getParentFile().mkdirs(); + file2.getParentFile().mkdirs(); + file3.getParentFile().mkdirs(); + file1.createNewFile(); + file2.createNewFile(); + file3.createNewFile(); + + String origin = "www.origin.server.net"; + + LogviewerLogPageHandler handler = new LogviewerLogPageHandler(rootPath, null, + new ResourceAuthorizer(Utils.readStormConfig())); + + Response expectedAll = LogviewerResponseBuilder.buildSuccessJsonResponse( + Lists.newArrayList("topoA/port1/worker.log", "topoA/port2/worker.log", "topoB/port1/worker.log"), + null, + origin + ); + + Response expectedFilterPort = LogviewerResponseBuilder.buildSuccessJsonResponse( + Lists.newArrayList("topoA/port1/worker.log", "topoB/port1/worker.log"), + null, + origin + ); + + Response expectedFilterTopoId = LogviewerResponseBuilder.buildSuccessJsonResponse( + Lists.newArrayList("topoB/port1/worker.log"), + null, + origin + ); + + Response returnedAll = handler.listLogFiles("user", null, null, null, origin); + Response returnedFilterPort = handler.listLogFiles("user", 1111, null, null, origin); + Response returnedFilterTopoId = handler.listLogFiles("user", null, "topoB", null, origin); + + Utils.forceDelete(rootPath); + + assertEqualsJsonResponse(expectedAll, returnedAll, List.class); + assertEqualsJsonResponse(expectedFilterPort, returnedFilterPort, List.class); + assertEqualsJsonResponse(expectedFilterTopoId, returnedFilterTopoId, List.class); + } + + private <T> void assertEqualsJsonResponse(Response expected, Response actual, Class<T> entityClass) throws IOException { + ObjectMapper objectMapper = new ObjectMapper(); + T entityFromExpected = objectMapper.readValue((String) expected.getEntity(), entityClass); + T actualFromExpected = objectMapper.readValue((String) expected.getEntity(), entityClass); + assertEquals(entityFromExpected, actualFromExpected); + + assertEquals(expected.getStatus(), actual.getStatus()); + assertTrue(expected.getHeaders().equalsIgnoreValueOrder(actual.getHeaders())); + } +}
