This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tika.git
commit 108c42454762c8215c573694b1f71ea1558482a2 Author: tballison <[email protected]> AuthorDate: Mon Feb 1 15:14:51 2021 -0500 TIKA-3289 -- allow creation of multiple tika-server processes from the TikaServerCli commandline --- .../tika/server/core/ServerStatusWatcher.java | 6 +- ...erverTimeouts.java => ServerTimeoutConfig.java} | 2 +- .../org/apache/tika/server/core/TikaServerCli.java | 441 ++++++++------------- .../apache/tika/server/core/TikaServerProcess.java | 424 ++++++++++++++++++++ .../tika/server/core/TikaServerWatchDog.java | 92 ++--- .../apache/tika/server/core/WatchDogResult.java | 51 +++ 6 files changed, 689 insertions(+), 327 deletions(-) diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerStatusWatcher.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerStatusWatcher.java index 5e12512..a2ad313 100644 --- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerStatusWatcher.java +++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerStatusWatcher.java @@ -40,7 +40,7 @@ public class ServerStatusWatcher implements Runnable { private final ServerStatus serverStatus; private final DataInputStream fromParent; private final long maxFiles; - private final ServerTimeouts serverTimeouts; + private final ServerTimeoutConfig serverTimeouts; private final Path forkedStatusPath; private final ByteBuffer statusBuffer = ByteBuffer.allocate(16); @@ -51,7 +51,7 @@ public class ServerStatusWatcher implements Runnable { public ServerStatusWatcher(ServerStatus serverStatus, InputStream inputStream, Path forkedStatusPath, long maxFiles, - ServerTimeouts serverTimeouts) throws IOException { + ServerTimeoutConfig serverTimeouts) throws IOException { this.serverStatus = serverStatus; this.maxFiles = maxFiles; this.serverTimeouts = serverTimeouts; @@ -166,7 +166,7 @@ public class ServerStatusWatcher implements Runnable { try { writeStatus(); } catch (Exception e) { - LOG.warn("problem writing status before shutdown", e); + LOG.debug("problem writing status before shutdown", e); } //if something went wrong with the parent, diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerTimeouts.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerTimeoutConfig.java similarity index 99% rename from tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerTimeouts.java rename to tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerTimeoutConfig.java index 5159c23..53bcd64 100644 --- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerTimeouts.java +++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerTimeoutConfig.java @@ -16,7 +16,7 @@ */ package org.apache.tika.server.core; -public class ServerTimeouts { +public class ServerTimeoutConfig { /* TODO: integrate these settings: diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java index 6680751..dbcbeb1 100644 --- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java +++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java @@ -17,59 +17,26 @@ package org.apache.tika.server.core; -import java.io.ByteArrayInputStream; -import java.io.InputStream; -import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; -import org.apache.cxf.binding.BindingFactoryManager; -import org.apache.cxf.jaxrs.JAXRSBindingFactory; -import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; -import org.apache.cxf.jaxrs.lifecycle.ResourceProvider; -import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider; -import org.apache.cxf.rs.security.cors.CrossOriginResourceSharingFilter; -import org.apache.cxf.transport.common.gzip.GZIPInInterceptor; -import org.apache.cxf.transport.common.gzip.GZIPOutInterceptor; import org.apache.tika.Tika; -import org.apache.tika.config.ServiceLoader; -import org.apache.tika.config.TikaConfig; -import org.apache.tika.parser.DigestingParser; -import org.apache.tika.parser.digestutils.BouncyCastleDigester; -import org.apache.tika.parser.digestutils.CommonsDigester; -import org.apache.tika.server.core.resource.DetectorResource; -import org.apache.tika.server.core.resource.EmitterResource; -import org.apache.tika.server.core.resource.LanguageResource; -import org.apache.tika.server.core.resource.MetadataResource; -import org.apache.tika.server.core.resource.RecursiveMetadataResource; -import org.apache.tika.server.core.resource.TikaDetectors; -import org.apache.tika.server.core.resource.TikaMimeTypes; -import org.apache.tika.server.core.resource.TikaParsers; -import org.apache.tika.server.core.resource.TikaResource; -import org.apache.tika.server.core.resource.TikaServerResource; -import org.apache.tika.server.core.resource.TikaServerStatus; -import org.apache.tika.server.core.resource.TikaVersion; -import org.apache.tika.server.core.resource.TikaWelcome; -import org.apache.tika.server.core.resource.TranslateResource; -import org.apache.tika.server.core.resource.UnpackerResource; -import org.apache.tika.server.core.writer.CSVMessageBodyWriter; -import org.apache.tika.server.core.writer.JSONMessageBodyWriter; -import org.apache.tika.server.core.writer.JSONObjWriter; -import org.apache.tika.server.core.writer.MetadataListMessageBodyWriter; -import org.apache.tika.server.core.writer.TarWriter; -import org.apache.tika.server.core.writer.TextMessageBodyWriter; -import org.apache.tika.server.core.writer.ZipWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,7 +72,10 @@ public class TikaServerCli { Options options = new Options(); options.addOption("C", "cors", true, "origin allowed to make CORS requests (default=NONE)\nall allowed if \"all\""); options.addOption("h", "host", true, "host name (default = " + DEFAULT_HOST + ", use * for all)"); - options.addOption("p", "port", true, "listen port (default = " + DEFAULT_PORT + ')'); + options.addOption("p", "port", true, + "listen port(s) (default = " + DEFAULT_PORT + ").\n" + + "Can specify multiple ports with inclusive ranges (e.g. 9990-9999)\n" + + "or with comma delimited list (e.g. 9996,9998,9995)"); options.addOption("c", "config", true, "Tika Configuration file to override default config with."); options.addOption("d", "digest", true, "include digest in metadata, e.g. md5,sha1:32,sha256"); options.addOption("dml", "digestMarkLimit", true, "max number of bytes to mark on stream for digest"); @@ -161,25 +131,62 @@ public class TikaServerCli { CommandLineParser cliParser = new DefaultParser(); - //need to strip out -J (forked jvm opts) from this parse - //they'll be processed correctly in args in the watch dog - //and they won't be needed in legacy. CommandLine line = cliParser.parse(options, stripForkedArgs(args)); - if (line.hasOption("noFork") || line.hasOption("forkedStatusFile")) { - if (line.hasOption("noFork")) { - //make sure the user didn't misunderstand the options - for (String forkedOnly : ONLY_IN_FORK_MODE) { - if (line.hasOption(forkedOnly)) { - System.err.println("The option '" + forkedOnly + - "' can't be used with '-noFork'"); - usage(options); + String[] newArgs = addDefaults(line, args); + line = cliParser.parse(options, stripForkedArgs(newArgs)); + if (line.hasOption("noFork")) { + noFork(line, newArgs); + } else { + try { + mainLoop(line, newArgs); + } catch (InterruptedException e) { + //swallow + } + } + } + + private static void mainLoop(CommandLine line, String[] origArgs) throws Exception { + + List<String> argList = new ArrayList<>(); + argList.addAll(Arrays.asList(origArgs)); + + NonForkedValues nonForkedValues = extractNonForkedValues(argList); + int maxRestarts = nonForkedValues.maxRestarts; + List<PortIdPair> portIdPairs = getPortIdPairs(nonForkedValues.id, nonForkedValues.portString); + + String[] args = argList.toArray(new String[0]); + + ExecutorService executorService = Executors.newFixedThreadPool(portIdPairs.size()); + ExecutorCompletionService<WatchDogResult> executorCompletionService = new ExecutorCompletionService<>(executorService); + ServerTimeoutConfig serverTimeoutConfig = configureServerTimeouts(line); + for (PortIdPair p : portIdPairs) { + executorCompletionService.submit( + new TikaServerWatchDog(args, p.port, p.id,0, serverTimeoutConfig)); + } + + int finished = 0; + try { + while (finished < portIdPairs.size()) { + Future<WatchDogResult> future = executorCompletionService.poll(1, TimeUnit.MINUTES); + if (future != null) { + LOG.debug("main loop future is available"); + WatchDogResult result = future.get(); + LOG.debug("main loop future: ({}); about to restart", result); + if (maxRestarts < 0 || result.getNumRestarts() < maxRestarts) { + executorCompletionService.submit( + new TikaServerWatchDog(args, result.getPort(), + result.getId(), + result.getNumRestarts(), serverTimeoutConfig)); + } else { + LOG.warn("id {} with port {} has exceeded maxRestarts {}. Shutting down and not restarting.", + result.getId(), result.getPort(), maxRestarts); + finished++; } } } - actuallyRunServer(line, options); - } else { - TikaServerWatchDog watchDog = new TikaServerWatchDog(); - watchDog.execute(args, configureServerTimeouts(line)); + } finally { + //this is just asking nicely...there is no guarantee! + executorService.shutdownNow(); } } @@ -193,241 +200,102 @@ public class TikaServerCli { return ret.toArray(new String[0]); } - //This starts the server in this process. This can - //be either a direct call from -noFork or the process that is forked - //in the 2.0 default mode. - private static void actuallyRunServer(CommandLine line, Options options) throws Exception { - if (line.hasOption("help")) { - usage(options); - } - - String host = DEFAULT_HOST; - - if (line.hasOption("host")) { - host = line.getOptionValue("host"); - if ("*".equals(host)) { - host = "0.0.0.0"; - } - } - - int port = DEFAULT_PORT; - - if (line.hasOption("port")) { - port = Integer.valueOf(line.getOptionValue("port")); + //removes and records values that either shouldn't go into the forked + //process or need to be modified + private static NonForkedValues extractNonForkedValues(List<String> args) { + int idIndex = -1; + int portIndex = -1; + int maxRestartIndex = -1; + NonForkedValues nonForked = new NonForkedValues(); + + for (int i = 0; i < args.size()-1; i++) { + if (args.get(i).equals("-i") || args.get(i).equals("--id")) { + idIndex = i; + nonForked.id = args.get(i+1); + } else if (args.get(i).equals("-p") || + args.get(i).equals("--port") || args.get(i).equals("--ports")) { + portIndex = i; + nonForked.portString = args.get(i+1); + } else if (args.get(i).equals("-maxRestarts") + || args.get(i).equals("--maxRestarts")) { + maxRestartIndex = i; + nonForked.maxRestarts = Integer.parseInt(args.get(i+1)); } + } - boolean returnStackTrace = false; - if (line.hasOption("includeStack")) { - returnStackTrace = true; - } - TikaLoggingFilter logFilter = null; - if (line.hasOption("log")) { - String logLevel = line.getOptionValue("log"); - if (LOG_LEVELS.contains(logLevel)) { - boolean isInfoLevel = "info".equals(logLevel); - logFilter = new TikaLoggingFilter(isInfoLevel); - } else { - LOG.info("Unsupported request URI log level: {}", logLevel); - } + //now remove -i and -p and their values from args + List<String> copy = new ArrayList<>(); + copy.addAll(args); + args.clear(); + for(int i = 0; i < copy.size(); i++) { + if (i == idIndex || i == portIndex || i == maxRestartIndex) { + i++; + continue; } + args.add(copy.get(i)); + } - CrossOriginResourceSharingFilter corsFilter = null; - if (line.hasOption("cors")) { - corsFilter = new CrossOriginResourceSharingFilter(); - String url = line.getOptionValue("cors"); - List<String> origins = new ArrayList<String>(); - if (!url.equals("*")) origins.add(url); // Empty list allows all origins. - corsFilter.setAllowOrigins(origins); - } - - // The Tika Configuration to use throughout - TikaConfig tika; - - if (line.hasOption("config")){ - String configFilePath = line.getOptionValue("config"); - LOG.info("Using custom config: {}", configFilePath); - tika = new TikaConfig(configFilePath); - } else{ - tika = TikaConfig.getDefaultConfig(); - } + return nonForked; + } - DigestingParser.Digester digester = null; - if (line.hasOption("digest")){ - int digestMarkLimit = DEFAULT_DIGEST_MARK_LIMIT; - if (line.hasOption("dml")) { - String dmlS = line.getOptionValue("dml"); - try { - digestMarkLimit = Integer.parseInt(dmlS); - } catch (NumberFormatException e) { - throw new RuntimeException("Must have parseable int after digestMarkLimit(dml): "+dmlS); - } - } - try { - digester = new CommonsDigester(digestMarkLimit, line.getOptionValue("digest")); - } catch (IllegalArgumentException commonsException) { - try { - digester = new BouncyCastleDigester(digestMarkLimit, line.getOptionValue("digest")); - } catch (IllegalArgumentException bcException) { - throw new IllegalArgumentException("Tried both CommonsDigester ("+commonsException.getMessage()+ - ") and BouncyCastleDigester ("+bcException.getMessage()+")", bcException); - } - } + public static void noFork(CommandLine line, String[] args) { + //make sure the user didn't misunderstand the options + for (String forkedOnly : ONLY_IN_FORK_MODE) { + if (line.hasOption(forkedOnly)) { + System.err.println("The option '" + forkedOnly + + "' can't be used with '-noFork'"); + usage(getOptions()); } - - InputStreamFactory inputStreamFactory = null; - if (line.hasOption("enableUnsecureFeatures")) { - inputStreamFactory = new FetcherStreamFactory(tika.getFetcherManager()); - LOG.info(UNSECURE_WARNING); - } else { - inputStreamFactory = new DefaultInputStreamFactory(); + } + if (line.hasOption("p")) { + String val = line.getOptionValue("p"); + try { + Integer.parseInt(val); + } catch (NumberFormatException e) { + System.err.println("-p must be a single integer in -noFork mode. I see: "+val); + usage(getOptions()); } - logFetchersAndEmitters(line.hasOption("enableUnsecureFeatures"), tika); - String serverId = line.hasOption("i") ? line.getOptionValue("i") : UUID.randomUUID().toString(); - LOG.debug("SERVER ID:" +serverId); - ServerStatus serverStatus; - - if (line.hasOption("noFork")) { - serverStatus = new ServerStatus(serverId, 0, true); - } else { - serverStatus = new ServerStatus(serverId, Integer.parseInt(line.getOptionValue("numRestarts")), - false); - //redirect!!! - InputStream in = System.in; - System.setIn(new ByteArrayInputStream(new byte[0])); - System.setOut(System.err); - - long maxFiles = DEFAULT_MAX_FILES; - if (line.hasOption("maxFiles")) { - maxFiles = Long.parseLong(line.getOptionValue("maxFiles")); - } + } + TikaServerProcess.main(args); + } - ServerTimeouts serverTimeouts = configureServerTimeouts(line); - String forkedStatusFile = line.getOptionValue("forkedStatusFile"); - Thread serverThread = - new Thread(new ServerStatusWatcher(serverStatus, in, - Paths.get(forkedStatusFile), maxFiles, serverTimeouts)); + private static String[] addDefaults(CommandLine line, String[] args) { + List<String> newArr = new ArrayList<>(Arrays.asList(args)); + if (! line.hasOption("p")) { + newArr.add("-p"); + newArr.add(Integer.toString(DEFAULT_PORT)); + } + if (! line.hasOption("h")) { + newArr.add("-h"); + newArr.add(DEFAULT_HOST); + } - serverThread.start(); - } - TikaResource.init(tika, digester, inputStreamFactory, serverStatus); - JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); - - List<ResourceProvider> rCoreProviders = new ArrayList<>(); - rCoreProviders.add(new SingletonResourceProvider(new MetadataResource())); - rCoreProviders.add(new SingletonResourceProvider(new RecursiveMetadataResource())); - rCoreProviders.add(new SingletonResourceProvider(new DetectorResource(serverStatus))); - rCoreProviders.add(new SingletonResourceProvider(new LanguageResource())); - rCoreProviders.add(new SingletonResourceProvider(new TranslateResource(serverStatus))); - rCoreProviders.add(new SingletonResourceProvider(new TikaResource())); - rCoreProviders.add(new SingletonResourceProvider(new UnpackerResource())); - rCoreProviders.add(new SingletonResourceProvider(new TikaMimeTypes())); - rCoreProviders.add(new SingletonResourceProvider(new TikaDetectors())); - rCoreProviders.add(new SingletonResourceProvider(new TikaParsers())); - rCoreProviders.add(new SingletonResourceProvider(new TikaVersion())); - if (line.hasOption("enableUnsecureFeatures")) { - rCoreProviders.add(new SingletonResourceProvider(new EmitterResource())); - } - rCoreProviders.addAll(loadResourceServices()); - if (line.hasOption("status")) { - rCoreProviders.add(new SingletonResourceProvider(new TikaServerStatus(serverStatus))); - } - List<ResourceProvider> rAllProviders = new ArrayList<>(rCoreProviders); - rAllProviders.add(new SingletonResourceProvider(new TikaWelcome(rCoreProviders))); - sf.setResourceProviders(rAllProviders); - - List<Object> providers = new ArrayList<>(); - providers.add(new TarWriter()); - providers.add(new ZipWriter()); - providers.add(new CSVMessageBodyWriter()); - providers.add(new MetadataListMessageBodyWriter()); - providers.add(new JSONMessageBodyWriter()); - providers.add(new TextMessageBodyWriter()); - providers.addAll(loadWriterServices()); - providers.add(new TikaServerParseExceptionMapper(returnStackTrace)); - providers.add(new JSONObjWriter()); - - if (logFilter != null) { - providers.add(logFilter); - } - if (corsFilter != null) { - providers.add(corsFilter); - } - sf.setProviders(providers); - - //set compression interceptors - sf.setOutInterceptors( - Collections.singletonList(new GZIPOutInterceptor()) - ); - sf.setInInterceptors( - Collections.singletonList(new GZIPInInterceptor())); - - String url = "http://" + host + ":" + port + "/"; - sf.setAddress(url); - BindingFactoryManager manager = sf.getBus().getExtension(BindingFactoryManager.class); - JAXRSBindingFactory factory = new JAXRSBindingFactory(); - factory.setBus(sf.getBus()); - manager.registerBindingFactory(JAXRSBindingFactory.JAXRS_BINDING_ID, factory); - sf.create(); - LOG.info("Started Apache Tika server at {}", url); + if (! line.hasOption("i")) { + newArr.add("-i"); + newArr.add(UUID.randomUUID().toString()); + } + return newArr.toArray(new String[0]); } - private static void logFetchersAndEmitters(boolean enableUnsecureFeatures, TikaConfig tika) { - if (enableUnsecureFeatures) { - StringBuilder sb = new StringBuilder(); - Set<String> supportedFetchers = tika.getFetcherManager().getSupported(); - sb.append("enableSecureFeatures has been selected.\n"); - if (supportedFetchers.size() == 0) { - sb.append("There are no fetchers specified in the TikaConfig"); - } else { - sb.append("The following fetchers are available to whomever has access to this server:\n"); - for (String p : supportedFetchers) { - sb.append(p).append("\n"); + private static List<PortIdPair> getPortIdPairs(String idString, String portsArg) { + List<PortIdPair> pairs = new ArrayList<>(); + Matcher m = Pattern.compile("^(\\d+)-(\\d+)\\Z").matcher(""); + for (String val : portsArg.split(",")) { + m.reset(val); + if (m.find()) { + int min = Math.min(Integer.parseInt(m.group(1)), Integer.parseInt(m.group(2))); + int max = Math.max(Integer.parseInt(m.group(1)), Integer.parseInt(m.group(2))); + for (int i = min; i <= max; i++) { + pairs.add(new PortIdPair(i, idString+"-"+i)); } - } - Set<String> emitters = tika.getEmitterManager().getSupported(); - if (supportedFetchers.size() == 0) { - sb.append("There are no emitters specified in the TikaConfig"); } else { - sb.append("The following emitters are available to whomever has access to this server:\n"); - for (String e : emitters) { - sb.append(e).append("\n"); - } - } - LOG.info(sb.toString()); - } else { - if (tika.getEmitterManager().getSupported().size() > 0) { - String warn = "-enableUnsecureFeatures has not been specified on the commandline.\n"+ - "The "+tika.getEmitterManager().getSupported().size() + " emitter(s) that you've\n"+ - "specified in TikaConfig will not be available on the /emit endpoint\n"+ - "To enable your emitters, start tika-server with the -enableUnsecureFeatures flag\n\n"; - LOG.warn(warn); + pairs.add(new PortIdPair(Integer.parseInt(val), idString+"-"+val)); } - if (tika.getFetcherManager().getSupported().size() > 0) { - String warn = "-enableUnsecureFeatures has not been specified on the commandline.\n"+ - "The "+tika.getFetcherManager().getSupported().size() + " fetcher(s) that you've\n"+ - "specified in TikaConfig will not be available\n"+ - "To enable your fetchers, start tika-server with the -enableUnsecureFeatures flag\n\n"; - LOG.warn(warn); - } - } - } - - private static Collection<? extends ResourceProvider> loadResourceServices() { - List<TikaServerResource> resources = new ServiceLoader(TikaServerCli.class.getClassLoader()) - .loadServiceProviders(TikaServerResource.class); - List<ResourceProvider> providers = new ArrayList<>(); - - for (TikaServerResource r : resources) { - providers.add(new SingletonResourceProvider(r)); } - return providers; + return pairs; } - private static Collection<?> loadWriterServices() { - return new ServiceLoader(TikaServerCli.class.getClassLoader()) - .loadServiceProviders(org.apache.tika.server.core.writer.TikaServerWriter.class); - } private static void usage(Options options) { HelpFormatter helpFormatter = new HelpFormatter(); @@ -435,8 +303,8 @@ public class TikaServerCli { System.exit(-1); } - private static ServerTimeouts configureServerTimeouts(CommandLine line) { - ServerTimeouts serverTimeouts = new ServerTimeouts(); + private static ServerTimeoutConfig configureServerTimeouts(CommandLine line) { + ServerTimeoutConfig serverTimeouts = new ServerTimeoutConfig(); /*TODO -- add these in if (line.hasOption("forkedProcessStartupMillis")) { serverTimeouts.setForkedProcessStartupMillis( @@ -471,4 +339,27 @@ public class TikaServerCli { return serverTimeouts; } + private static class PortIdPair { + int port; + String id; + + public PortIdPair(int port, String id) { + this.port = port; + this.id = id; + } + } + + /** + * these are parameters that should not go + * directly into the forked process. They + * are either used by the forking process or + * they are modified or may be modified before + * creating the forked process. + */ + private static class NonForkedValues { + String portString; + String id; + int maxRestarts = -1; + + } } diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java new file mode 100644 index 0000000..0a8fd91 --- /dev/null +++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tika.server.core; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.cxf.binding.BindingFactoryManager; +import org.apache.cxf.jaxrs.JAXRSBindingFactory; +import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; +import org.apache.cxf.jaxrs.lifecycle.ResourceProvider; +import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider; +import org.apache.cxf.rs.security.cors.CrossOriginResourceSharingFilter; +import org.apache.cxf.transport.common.gzip.GZIPInInterceptor; +import org.apache.cxf.transport.common.gzip.GZIPOutInterceptor; +import org.apache.tika.Tika; +import org.apache.tika.config.ServiceLoader; +import org.apache.tika.config.TikaConfig; +import org.apache.tika.parser.DigestingParser; +import org.apache.tika.parser.digestutils.BouncyCastleDigester; +import org.apache.tika.parser.digestutils.CommonsDigester; +import org.apache.tika.server.core.resource.DetectorResource; +import org.apache.tika.server.core.resource.EmitterResource; +import org.apache.tika.server.core.resource.LanguageResource; +import org.apache.tika.server.core.resource.MetadataResource; +import org.apache.tika.server.core.resource.RecursiveMetadataResource; +import org.apache.tika.server.core.resource.TikaDetectors; +import org.apache.tika.server.core.resource.TikaMimeTypes; +import org.apache.tika.server.core.resource.TikaParsers; +import org.apache.tika.server.core.resource.TikaResource; +import org.apache.tika.server.core.resource.TikaServerResource; +import org.apache.tika.server.core.resource.TikaServerStatus; +import org.apache.tika.server.core.resource.TikaVersion; +import org.apache.tika.server.core.resource.TikaWelcome; +import org.apache.tika.server.core.resource.TranslateResource; +import org.apache.tika.server.core.resource.UnpackerResource; +import org.apache.tika.server.core.writer.CSVMessageBodyWriter; +import org.apache.tika.server.core.writer.JSONMessageBodyWriter; +import org.apache.tika.server.core.writer.JSONObjWriter; +import org.apache.tika.server.core.writer.MetadataListMessageBodyWriter; +import org.apache.tika.server.core.writer.TarWriter; +import org.apache.tika.server.core.writer.TextMessageBodyWriter; +import org.apache.tika.server.core.writer.ZipWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +public class TikaServerProcess { + + + //used in fork mode -- restart after processing this many files + private static final long DEFAULT_MAX_FILES = 100000; + + + private static final int DEFAULT_DIGEST_MARK_LIMIT = 20 * 1024 * 1024; + public static final Set<String> LOG_LEVELS = new HashSet<>(Arrays.asList("debug", "info")); + private static final Logger LOG = LoggerFactory.getLogger(TikaServerProcess.class); + + + private static Options getOptions() { + Options options = new Options(); + options.addOption("C", "cors", true, "origin allowed to make CORS requests (default=NONE)\nall allowed if \"all\""); + options.addOption("h", "host", true, "host name, use * for all)"); + options.addOption("p", "port", true, "listen port"); + options.addOption("c", "config", true, "Tika Configuration file to override default config with."); + options.addOption("d", "digest", true, "include digest in metadata, e.g. md5,sha1:32,sha256"); + options.addOption("dml", "digestMarkLimit", true, "max number of bytes to mark on stream for digest"); + options.addOption("l", "log", true, "request URI log level ('debug' or 'info')"); + options.addOption("s", "includeStack", false, "whether or not to return a stack trace\nif there is an exception during 'parse'"); + options.addOption("i", "id", true, "id to use for server in server status endpoint"); + options.addOption("status", false, "enable the status endpoint"); + options.addOption("?", "help", false, "this help message"); + options.addOption("enableUnsecureFeatures", false, "this is required to enable fetchers and emitters. " + + " The user acknowledges that fetchers and emitters introduce potential security vulnerabilities."); + options.addOption("noFork", false, "legacy mode, less robust -- this starts up tika-server" + + " without forking a process."); + options.addOption("taskTimeoutMillis", true, + "Not allowed in -noFork: how long to wait for a task (e.g. parse) to finish"); + options.addOption("taskPulseMillis", true, + "Not allowed in -noFork: how often to check if a task has timed out."); + options.addOption("pingTimeoutMillis", true, + "Not allowed in -noFork: how long to wait to wait for a ping and/or ping response."); + options.addOption("pingPulseMillis", true, + "Not allowed in -noFork: how often to check if a ping has timed out."); + options.addOption("maxFiles", true, + "Not allowed in -noFork: shutdown server after this many files (to handle parsers that might introduce " + + "slowly building memory leaks); the default is " + DEFAULT_MAX_FILES + ". Set to -1 to turn this off."); + options.addOption("javaHome", true, + "Not allowed in -noFork: override system property JAVA_HOME for calling java for the forked process"); + options.addOption("forkedStatusFile", true, + "Not allowed in -noFork: temporary file used to communicate " + + "with forking process -- do not use this! Should only be invoked by forking process."); + options.addOption("tmpFilePrefix", true, + "Not allowed in -noFork: prefix for temp file - for debugging only"); + options.addOption("numRestarts", true, + "Not allowed in -noFork: number of times that the forked server has had to be restarted."); + return options; + } + + public static void main(String[] args) { + LOG.info("Starting {} server", new Tika()); + try { + Options options = getOptions(); + + CommandLineParser cliParser = new DefaultParser(); + CommandLine line = cliParser.parse(options, args); + runServer(line, options); + } catch (Exception e) { + e.printStackTrace(); + LOG.error("Can't start: ", e); + System.exit(-1); + } + } + + //This starts the server in this process. This can + //be either a direct call from -noFork or the process that is forked + //in the 2.0 default mode. + private static void runServer(CommandLine line, Options options) throws Exception { + + String host = null; + + if (line.hasOption("host")) { + host = line.getOptionValue("host"); + if ("*".equals(host)) { + host = "0.0.0.0"; + } + } else { + throw new IllegalArgumentException("Must specify 'host'"); + } + + int port = -1; + + if (line.hasOption("port")) { + port = Integer.valueOf(line.getOptionValue("port")); + } else { + throw new IllegalArgumentException("Must specify port"); + } + + boolean returnStackTrace = false; + if (line.hasOption("includeStack")) { + returnStackTrace = true; + } + + TikaLoggingFilter logFilter = null; + if (line.hasOption("log")) { + String logLevel = line.getOptionValue("log"); + if (LOG_LEVELS.contains(logLevel)) { + boolean isInfoLevel = "info".equals(logLevel); + logFilter = new TikaLoggingFilter(isInfoLevel); + } else { + LOG.info("Unsupported request URI log level: {}", logLevel); + } + } + + CrossOriginResourceSharingFilter corsFilter = null; + if (line.hasOption("cors")) { + corsFilter = new CrossOriginResourceSharingFilter(); + String url = line.getOptionValue("cors"); + List<String> origins = new ArrayList<>(); + if (!url.equals("*")) origins.add(url); // Empty list allows all origins. + corsFilter.setAllowOrigins(origins); + } + + // The Tika Configuration to use throughout + TikaConfig tika; + + if (line.hasOption("config")) { + String configFilePath = line.getOptionValue("config"); + LOG.info("Using custom config: {}", configFilePath); + tika = new TikaConfig(configFilePath); + } else { + tika = TikaConfig.getDefaultConfig(); + } + + DigestingParser.Digester digester = null; + if (line.hasOption("digest")) { + int digestMarkLimit = DEFAULT_DIGEST_MARK_LIMIT; + if (line.hasOption("dml")) { + String dmlS = line.getOptionValue("dml"); + try { + digestMarkLimit = Integer.parseInt(dmlS); + } catch (NumberFormatException e) { + throw new RuntimeException("Must have parseable int after digestMarkLimit(dml): " + dmlS); + } + } + try { + digester = new CommonsDigester(digestMarkLimit, line.getOptionValue("digest")); + } catch (IllegalArgumentException commonsException) { + try { + digester = new BouncyCastleDigester(digestMarkLimit, line.getOptionValue("digest")); + } catch (IllegalArgumentException bcException) { + throw new IllegalArgumentException("Tried both CommonsDigester (" + commonsException.getMessage() + + ") and BouncyCastleDigester (" + bcException.getMessage() + ")", bcException); + } + } + } + + InputStreamFactory inputStreamFactory = null; + if (line.hasOption("enableUnsecureFeatures")) { + inputStreamFactory = new FetcherStreamFactory(tika.getFetcherManager()); + } else { + inputStreamFactory = new DefaultInputStreamFactory(); + } + logFetchersAndEmitters(line.hasOption("enableUnsecureFeatures"), tika); + String serverId = line.hasOption("i") ? line.getOptionValue("i") : UUID.randomUUID().toString(); + LOG.debug("SERVER ID:" + serverId); + ServerStatus serverStatus; + + if (line.hasOption("noFork")) { + serverStatus = new ServerStatus(serverId, 0, true); + } else { + serverStatus = new ServerStatus(serverId, Integer.parseInt(line.getOptionValue("numRestarts")), + false); + //redirect!!! + InputStream in = System.in; + System.setIn(new ByteArrayInputStream(new byte[0])); + System.setOut(System.err); + + long maxFiles = DEFAULT_MAX_FILES; + if (line.hasOption("maxFiles")) { + maxFiles = Long.parseLong(line.getOptionValue("maxFiles")); + } + + ServerTimeoutConfig serverTimeouts = configureServerTimeouts(line); + String forkedStatusFile = line.getOptionValue("forkedStatusFile"); + Thread serverThread = + new Thread(new ServerStatusWatcher(serverStatus, in, + Paths.get(forkedStatusFile), maxFiles, serverTimeouts)); + + serverThread.start(); + } + TikaResource.init(tika, digester, inputStreamFactory, serverStatus); + JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); + + List<ResourceProvider> rCoreProviders = new ArrayList<>(); + rCoreProviders.add(new SingletonResourceProvider(new MetadataResource())); + rCoreProviders.add(new SingletonResourceProvider(new RecursiveMetadataResource())); + rCoreProviders.add(new SingletonResourceProvider(new DetectorResource(serverStatus))); + rCoreProviders.add(new SingletonResourceProvider(new LanguageResource())); + rCoreProviders.add(new SingletonResourceProvider(new TranslateResource(serverStatus))); + rCoreProviders.add(new SingletonResourceProvider(new TikaResource())); + rCoreProviders.add(new SingletonResourceProvider(new UnpackerResource())); + rCoreProviders.add(new SingletonResourceProvider(new TikaMimeTypes())); + rCoreProviders.add(new SingletonResourceProvider(new TikaDetectors())); + rCoreProviders.add(new SingletonResourceProvider(new TikaParsers())); + rCoreProviders.add(new SingletonResourceProvider(new TikaVersion())); + if (line.hasOption("enableUnsecureFeatures")) { + rCoreProviders.add(new SingletonResourceProvider(new EmitterResource())); + } + rCoreProviders.addAll(loadResourceServices()); + if (line.hasOption("status")) { + rCoreProviders.add(new SingletonResourceProvider(new TikaServerStatus(serverStatus))); + } + List<ResourceProvider> rAllProviders = new ArrayList<>(rCoreProviders); + rAllProviders.add(new SingletonResourceProvider(new TikaWelcome(rCoreProviders))); + sf.setResourceProviders(rAllProviders); + + List<Object> providers = new ArrayList<>(); + providers.add(new TarWriter()); + providers.add(new ZipWriter()); + providers.add(new CSVMessageBodyWriter()); + providers.add(new MetadataListMessageBodyWriter()); + providers.add(new JSONMessageBodyWriter()); + providers.add(new TextMessageBodyWriter()); + providers.addAll(loadWriterServices()); + providers.add(new TikaServerParseExceptionMapper(returnStackTrace)); + providers.add(new JSONObjWriter()); + + if (logFilter != null) { + providers.add(logFilter); + } + if (corsFilter != null) { + providers.add(corsFilter); + } + sf.setProviders(providers); + + //set compression interceptors + sf.setOutInterceptors( + Collections.singletonList(new GZIPOutInterceptor()) + ); + sf.setInInterceptors( + Collections.singletonList(new GZIPInInterceptor())); + + String url = "http://" + host + ":" + port + "/"; + sf.setAddress(url); + BindingFactoryManager manager = sf.getBus().getExtension(BindingFactoryManager.class); + JAXRSBindingFactory factory = new JAXRSBindingFactory(); + factory.setBus(sf.getBus()); + manager.registerBindingFactory(JAXRSBindingFactory.JAXRS_BINDING_ID, factory); + sf.create(); + LOG.info("Started Apache Tika server {} at {}", + serverId, + url); + + } + + private static void logFetchersAndEmitters(boolean enableUnsecureFeatures, TikaConfig tika) { + if (enableUnsecureFeatures) { + StringBuilder sb = new StringBuilder(); + Set<String> supportedFetchers = tika.getFetcherManager().getSupported(); + sb.append("enableSecureFeatures has been selected.\n"); + if (supportedFetchers.size() == 0) { + sb.append("There are no fetchers specified in the TikaConfig"); + } else { + sb.append("The following fetchers are available to whomever has access to this server:\n"); + for (String p : supportedFetchers) { + sb.append(p).append("\n"); + } + } + Set<String> emitters = tika.getEmitterManager().getSupported(); + if (supportedFetchers.size() == 0) { + sb.append("There are no emitters specified in the TikaConfig"); + } else { + sb.append("The following emitters are available to whomever has access to this server:\n"); + for (String e : emitters) { + sb.append(e).append("\n"); + } + } + LOG.info(sb.toString()); + } else { + if (tika.getEmitterManager().getSupported().size() > 0) { + String warn = "-enableUnsecureFeatures has not been specified on the commandline.\n" + + "The " + tika.getEmitterManager().getSupported().size() + " emitter(s) that you've\n" + + "specified in TikaConfig will not be available on the /emit endpoint\n" + + "To enable your emitters, start tika-server with the -enableUnsecureFeatures flag\n\n"; + LOG.warn(warn); + } + if (tika.getFetcherManager().getSupported().size() > 0) { + String warn = "-enableUnsecureFeatures has not been specified on the commandline.\n" + + "The " + tika.getFetcherManager().getSupported().size() + " fetcher(s) that you've\n" + + "specified in TikaConfig will not be available\n" + + "To enable your fetchers, start tika-server with the -enableUnsecureFeatures flag\n\n"; + LOG.warn(warn); + } + } + } + + private static Collection<? extends ResourceProvider> loadResourceServices() { + List<TikaServerResource> resources = new ServiceLoader(TikaServerProcess.class.getClassLoader()) + .loadServiceProviders(TikaServerResource.class); + List<ResourceProvider> providers = new ArrayList<>(); + + for (TikaServerResource r : resources) { + providers.add(new SingletonResourceProvider(r)); + } + return providers; + } + + private static Collection<?> loadWriterServices() { + return new ServiceLoader(TikaServerProcess.class.getClassLoader()) + .loadServiceProviders(org.apache.tika.server.core.writer.TikaServerWriter.class); + } + + private static void usage(Options options) { + HelpFormatter helpFormatter = new HelpFormatter(); + helpFormatter.printHelp("tikaserver", options); + System.exit(-1); + } + + private static ServerTimeoutConfig configureServerTimeouts(CommandLine line) { + ServerTimeoutConfig serverTimeouts = new ServerTimeoutConfig(); + /*TODO -- add these in + if (line.hasOption("forkedProcessStartupMillis")) { + serverTimeouts.setForkedProcessStartupMillis( + Long.parseLong(line.getOptionValue("forkedProcessStartupMillis"))); + } + if (line.hasOption("forkedProcessShutdownMillis")) { + serverTimeouts.setForkedProcessShutdownMillis( + Long.parseLong(line.getOptionValue("forkedProcesShutdownMillis"))); + }*/ + if (line.hasOption("taskTimeoutMillis")) { + serverTimeouts.setTaskTimeoutMillis( + Long.parseLong(line.getOptionValue("taskTimeoutMillis"))); + } + if (line.hasOption("pingTimeoutMillis")) { + serverTimeouts.setPingTimeoutMillis( + Long.parseLong(line.getOptionValue("pingTimeoutMillis"))); + } + if (line.hasOption("pingPulseMillis")) { + serverTimeouts.setPingPulseMillis( + Long.parseLong(line.getOptionValue("pingPulseMillis"))); + } + + if (line.hasOption("maxRestarts")) { + serverTimeouts.setMaxRestarts(Integer.parseInt(line.getOptionValue("maxRestarts"))); + } + + if (line.hasOption("maxForkedStartupMillis")) { + serverTimeouts.setMaxForkedStartupMillis( + Long.parseLong(line.getOptionValue("maxForkedStartupMillis"))); + } + + return serverTimeouts; + } + +} diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerWatchDog.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerWatchDog.java index 6247463..3670553 100644 --- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerWatchDog.java +++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerWatchDog.java @@ -39,14 +39,16 @@ import java.nio.file.Paths; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import static java.nio.file.StandardOpenOption.READ; import static java.nio.file.StandardOpenOption.WRITE; -public class TikaServerWatchDog { +public class TikaServerWatchDog implements Callable<WatchDogResult> { private enum FORKED_STATUS { INITIALIZING, @@ -62,32 +64,40 @@ public class TikaServerWatchDog { private volatile Instant lastPing = null; private ForkedProcess forkedProcess = null; + private final String[] args; + private final int port; + private final String id; + private final int restarts; + private final ServerTimeoutConfig serverTimeoutConfig; + + TikaServerWatchDog(String[] args, int port, String id, int restarts, + ServerTimeoutConfig serverTimeoutConfig) { + this.args = addPortAndId(args, port, id); + this.port = port; + this.id = id; + this.restarts = restarts; + this.serverTimeoutConfig = serverTimeoutConfig; + } + + private static String[] addPortAndId(String[] args, int port, String id) { + List<String> newArgs = new ArrayList<>(); + newArgs.addAll(Arrays.asList(args)); + newArgs.add("-p"); + newArgs.add(Integer.toString(port)); + newArgs.add("-i"); + newArgs.add(id); + return newArgs.toArray(new String[0]); + } - public void execute(String[] args, ServerTimeouts serverTimeouts) throws Exception { - args = addIdIfMissing(args); + @Override + public WatchDogResult call() throws Exception { LOG.info("server watch dog is starting up"); - startPingTimer(serverTimeouts); try { - int restarts = 0; - forkedProcess = new ForkedProcess(args, restarts, serverTimeouts); + forkedProcess = new ForkedProcess(args, restarts, serverTimeoutConfig); setForkedStatus(FORKED_STATUS.RUNNING); - while (true) { - if (!forkedProcess.ping()) { - LOG.debug("bad ping, initializing"); - restarts++; - setForkedStatus(FORKED_STATUS.INITIALIZING); - lastPing = null; - forkedProcess.close(); - LOG.debug("About to restart the forked process"); - forkedProcess = new ForkedProcess(args, restarts, serverTimeouts); - LOG.info("Successfully restarted forked process -- {} restarts so far)", restarts); - setForkedStatus(FORKED_STATUS.RUNNING); - if (serverTimeouts.getMaxRestarts() > -1 && restarts >= serverTimeouts.getMaxRestarts()) { - LOG.warn("hit max restarts: "+restarts+". Stopping now"); - break; - } - } - Thread.sleep(serverTimeouts.getPingPulseMillis()); + startPingTimer(serverTimeoutConfig); + while (forkedProcess.ping()) { + Thread.sleep(serverTimeoutConfig.getPingPulseMillis()); } } catch (InterruptedException e) { //interrupted...shutting down @@ -99,24 +109,10 @@ public class TikaServerWatchDog { forkedProcess.close(); } } + return new WatchDogResult(port, id,restarts+1); } - private String[] addIdIfMissing(String[] args) { - for (String arg : args) { - //id is already specified, leave the array as is - if (arg.equals("-i") || arg.equals("--id")) { - return args; - } - } - - String[] newArgs = new String[args.length+2]; - System.arraycopy(args, 0, newArgs, 0, args.length); - newArgs[args.length] = "-i"; - newArgs[args.length+1] = UUID.randomUUID().toString(); - return newArgs; - } - - private void startPingTimer(ServerTimeouts serverTimeouts) { + private void startPingTimer(ServerTimeoutConfig serverTimeouts) { //if the forked thread is in stop-the-world mode, and isn't //reading the ping, this thread checks to make sure //that the parent ping is sent often enough. @@ -232,11 +228,11 @@ public class TikaServerWatchDog { private final Process process; private final DataOutputStream toForked; - private final ServerTimeouts serverTimeouts; + private final ServerTimeoutConfig serverTimeoutConfig; private final Path forkedStatusFile; private final ByteBuffer statusBuffer = ByteBuffer.allocate(16); - private ForkedProcess(String[] args, int numRestarts, ServerTimeouts serverTimeouts) throws Exception { + private ForkedProcess(String[] args, int numRestarts, ServerTimeoutConfig serverTimeoutConfig) throws Exception { String prefix = DEFAULT_FORKED_STATUS_FILE_PREFIX; for (int i = 0; i < args.length; i++) { if (args[i].equals("-tmpFilePrefix")) { @@ -245,7 +241,7 @@ public class TikaServerWatchDog { } this.forkedStatusFile = Files.createTempFile(prefix, ""); - this.serverTimeouts = serverTimeouts; + this.serverTimeoutConfig = serverTimeoutConfig; this.process = startProcess(args, numRestarts, forkedStatusFile); //wait for file to be written/initialized by forked process @@ -253,7 +249,7 @@ public class TikaServerWatchDog { long elapsed = Duration.between(start, Instant.now()).toMillis(); try { while (process.isAlive() && Files.size(forkedStatusFile) < 12 - && elapsed < serverTimeouts.getMaxForkedStartupMillis()) { + && elapsed < serverTimeoutConfig.getMaxForkedStartupMillis()) { Thread.sleep(50); elapsed = Duration.between(start, Instant.now()).toMillis(); } @@ -263,7 +259,7 @@ public class TikaServerWatchDog { LOG.warn("failed to start forked process", e); } - if (elapsed > serverTimeouts.getMaxForkedStartupMillis()) { + if (elapsed > serverTimeoutConfig.getMaxForkedStartupMillis()) { close(); throw new RuntimeException("Forked process failed to start after "+elapsed + " (ms)"); } @@ -312,7 +308,7 @@ public class TikaServerWatchDog { forkedStatus.status); if (elapsedSinceLastUpdate > - serverTimeouts.getPingTimeoutMillis()) { + serverTimeoutConfig.getPingTimeoutMillis()) { //forked hasn't written a status update in a longer time than allowed LOG.warn("Forked's last update exceeded ping timeout: {} (ms) with status {}", elapsedSinceLastUpdate, forkedStatus.status); @@ -329,7 +325,7 @@ public class TikaServerWatchDog { //only reading, but need to include write to allow for locking try (FileChannel fc = FileChannel.open(forkedStatusFile, READ, WRITE)) { - while (elapsed < serverTimeouts.getPingTimeoutMillis()) { + while (elapsed < serverTimeoutConfig.getPingTimeoutMillis()) { try (FileLock lock = fc.tryLock(0, 16, true)) { if (lock != null) { ((Buffer)statusBuffer).position(0); @@ -402,11 +398,11 @@ public class TikaServerWatchDog { jvmArgs.add(cp); } argList.addAll(jvmArgs); - argList.add("org.apache.tika.server.core.TikaServerCli"); + argList.add("org.apache.tika.server.core.TikaServerProcess"); argList.addAll(forkedArgs); argList.add("-numRestarts"); argList.add(Integer.toString(numRestarts)); - LOG.info("forked process commandline: " +argList.toString()); + LOG.debug("forked process commandline: " +argList.toString()); builder.command(argList); Process process = builder.start(); //redirect stdout to parent stderr to avoid error msgs diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/WatchDogResult.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/WatchDogResult.java new file mode 100644 index 0000000..03174d3 --- /dev/null +++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/WatchDogResult.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.server.core; + +public class WatchDogResult { + + private final int port; + private final String id; + private final int numRetries; + + public WatchDogResult(int port, String id, int numRetries) { + this.port = port; + this.id = id; + this.numRetries = numRetries; + } + + public int getPort() { + return port; + } + + public int getNumRestarts() { + return numRetries; + } + + public String getId() { + return id; + } + + @Override + public String toString() { + return "WatchDogResult{" + + "port=" + port + + ", id='" + id + '\'' + + ", numRetries=" + numRetries + + '}'; + } +}
