This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new da1ba84 TIKA-3410 -- implement actual logging in PipesServer
da1ba84 is described below
commit da1ba84c32d53f7d9d42da0356a9b4b76b275e5e
Author: tallison <[email protected]>
AuthorDate: Thu May 20 10:12:14 2021 -0400
TIKA-3410 -- implement actual logging in PipesServer
---
tika-core/pom.xml | 6 ++
.../java/org/apache/tika/pipes/PipesClient.java | 63 +++--------------
.../java/org/apache/tika/pipes/PipesServer.java | 81 ++++++++--------------
.../resources/pipes-fork-server-default-log4j2.xml | 32 +++++++++
.../apache/tika/fork/ForkParserTikaBinTest.java | 3 +-
.../tika/pipes/solrtest/TikaPipesSolrTestBase.java | 4 +-
.../resources/pipes-fork-server-custom-log4j2.xml | 32 +++++++++
7 files changed, 114 insertions(+), 107 deletions(-)
diff --git a/tika-core/pom.xml b/tika-core/pom.xml
index 8a2b2ae..2bbfbab 100644
--- a/tika-core/pom.xml
+++ b/tika-core/pom.xml
@@ -94,6 +94,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <version>${log4j2.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
index 1b2eecd..330e5db 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
@@ -16,15 +16,12 @@
*/
package org.apache.tika.pipes;
-import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.StandardCharsets;
@@ -49,8 +46,6 @@ public class PipesClient implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(PipesClient.class);
private Process process;
- private LogGobbler logGobbler;
- private Thread logGobblerThread;
private final PipesConfigBase pipesConfig;
private DataOutputStream output;
private DataInputStream input;
@@ -229,22 +224,17 @@ public class PipesClient implements Closeable {
} else {
LOG.info("starting process");
}
- if (logGobblerThread != null) {
- logGobblerThread.interrupt();
- }
ProcessBuilder pb = new ProcessBuilder(getCommandline());
+ pb.redirectError(ProcessBuilder.Redirect.INHERIT);
process = pb.start();
- logGobbler = new LogGobbler(process.getErrorStream());
- logGobblerThread = new Thread(logGobbler);
- logGobblerThread.setDaemon(true);
- logGobblerThread.start();
input = new DataInputStream(process.getInputStream());
output = new DataOutputStream(process.getOutputStream());
//wait for ready signal
FutureTask<Integer> futureTask = new FutureTask<>(() -> {
int b = input.read();
if (b != PipesServer.READY) {
- throw new RuntimeException("Couldn't start server: " + b);
+ throw new RuntimeException("Couldn't start server: " + b +
+ " Make absolutely certain that your logger is not
writing to stdout.");
}
return 1;
});
@@ -272,7 +262,7 @@ public class PipesClient implements Closeable {
boolean hasClassPath = false;
boolean hasHeadless = false;
boolean hasExitOnOOM = false;
-
+ boolean hasLog4j = false;
for (String arg : configArgs) {
if (arg.startsWith("-Djava.awt.headless")) {
hasHeadless = true;
@@ -284,6 +274,9 @@ public class PipesClient implements Closeable {
arg.equals("-XX:+CrashOnOutOfMemoryError")) {
hasExitOnOOM = true;
}
+ if (arg.startsWith("-Dlog4j.configuration")) {
+ hasLog4j = true;
+ }
}
List<String> commandLine = new ArrayList<>();
@@ -299,6 +292,9 @@ public class PipesClient implements Closeable {
if (! hasExitOnOOM) {
//warn
}
+ if (! hasLog4j) {
+
commandLine.add("-Dlog4j.configurationFile=classpath:pipes-fork-server-default-log4j2.xml");
+ }
commandLine.addAll(configArgs);
commandLine.add("org.apache.tika.pipes.PipesServer");
commandLine.add(
@@ -313,44 +309,7 @@ public class PipesClient implements Closeable {
commandLine.add(maxForEmitBatchBytes);
commandLine.add(Long.toString(pipesConfig.getTimeoutMillis()));
commandLine.add(Long.toString(pipesConfig.getShutdownClientAfterMillis()));
- LOG.debug("commandline: " + commandLine.toString());
+ LOG.debug("commandline: " + commandLine);
return commandLine.toArray(new String[0]);
}
-
- public static class LogGobbler implements Runnable {
- private static final Logger SERVER_LOG =
LoggerFactory.getLogger(LogGobbler.class);
-
- private final BufferedReader reader;
- public LogGobbler(InputStream is) {
- reader = new BufferedReader(new InputStreamReader(is,
StandardCharsets.UTF_8));
- }
-
- @Override
- public void run() {
- String line = null;
- try {
- line = reader.readLine();
- } catch (IOException e) {
- return;
- }
- while (line != null) {
- if (line.startsWith("debug ")) {
- SERVER_LOG.debug(line.substring(6));
- } else if (line.startsWith("info ")) {
- SERVER_LOG.info(line.substring(5));
- } else if (line.startsWith("warn ")) {
- SERVER_LOG.warn(line.substring(5));
- } else if (line.startsWith("error ")) {
- SERVER_LOG.error(line.substring(6));
- } else {
- SERVER_LOG.debug(line);
- }
- try {
- line = reader.readLine();
- } catch (IOException e) {
- return;
- }
- }
- }
- }
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
index 6a5f6c3..7e0ab03 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
@@ -30,6 +30,8 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
import org.apache.tika.config.TikaConfig;
@@ -54,8 +56,18 @@ import org.apache.tika.sax.RecursiveParserWrapperHandler;
import org.apache.tika.utils.ExceptionUtils;
import org.apache.tika.utils.StringUtils;
+/**
+ * This server is forked from the PipesClient. This class isolates
+ * parsing from the client to protect the primary JVM.
+ *
+ * When configuring logging for this class, make absolutely certain
+ * not to write to STDOUT. This class uses STDOUT to communicate with
+ * the PipesClient.
+ */
public class PipesServer implements Runnable {
+ private static final Logger LOG =
LoggerFactory.getLogger(PipesServer.class);
+
//this has to be some number not close to 0-3
//it looks like the server crashes with exit value 3 on OOM, for example
public static final int TIMEOUT_EXIT_CODE = 17;
@@ -139,7 +151,6 @@ public class PipesServer implements Runnable {
serverWaitTimeoutMillis);
System.setIn(new ByteArrayInputStream(new byte[0]));
System.setOut(System.err);
-
Thread watchdog = new Thread(server, "Tika Watchdog");
watchdog.setDaemon(true);
watchdog.start();
@@ -153,11 +164,11 @@ public class PipesServer implements Runnable {
synchronized (lock) {
long elapsed = System.currentTimeMillis() - since;
if (parsing && elapsed > serverParseTimeoutMillis) {
- warn("timeout server; elapsed " + elapsed + " with " +
serverParseTimeoutMillis);
+ LOG.warn("timeout server; elapsed {} with {}",
elapsed, serverParseTimeoutMillis);
exit(TIMEOUT_EXIT_CODE);
} else if (!parsing && serverWaitTimeoutMillis > 0 &&
elapsed > serverWaitTimeoutMillis) {
- debug("closing down from inactivity");
+ LOG.debug("closing down from inactivity");
exit(0);
}
}
@@ -168,38 +179,17 @@ public class PipesServer implements Runnable {
}
}
- private void debug(String msg) {
- System.err.println("debug " + msg.replaceAll("[\r\n]", " "));
- System.err.flush();
- }
-
- private void warn(String msg) {
- System.err.println("warn " + msg);
- System.err.flush();
- }
-
- private void warn(Throwable t) {
- System.err.println("warn " +
ExceptionUtils.getStackTrace(t).replaceAll("[\r\n]", " "));
- System.err.flush();
- }
-
- private void err(Throwable t) {
- System.err.println("error " +
ExceptionUtils.getStackTrace(t).replaceAll("[\r\n]", " "));
- System.err.flush();
- }
-
-
public void processRequests() {
//initialize
try {
initializeParser();
} catch (Throwable t) {
- err(t);
+ LOG.error("couldn't initialize parser", t);
try {
output.writeByte(FAILED_TO_START);
output.flush();
} catch (IOException e) {
- warn(e);
+ LOG.warn("couldn't notify of failure to start", e);
}
return;
}
@@ -222,9 +212,7 @@ public class PipesServer implements Runnable {
output.flush();
}
} catch (Throwable t) {
- t.printStackTrace();
- err(t);
- System.err.println("exiting");
+ LOG.error("main loop error (did the forking process shut down?)",
t);
exit(1);
}
System.err.flush();
@@ -300,7 +288,7 @@ public class PipesServer implements Runnable {
} catch (SecurityException e) {
throw e;
} catch (TikaException | IOException e) {
- warn(e);
+ LOG.warn("fetch exception", e);
throw new FetchException(e);
} catch (OutOfMemoryError e) {
handleOOM(e);
@@ -334,8 +322,7 @@ public class PipesServer implements Runnable {
try {
return fetcherManager.getFetcher(fetcherName);
} catch (TikaException | IOException e) {
- warn(e);
- //LOG.error("can't get fetcher", e);
+ LOG.error("can't load fetcher", e);
throw new FetchException(e);
}
}
@@ -347,7 +334,7 @@ public class PipesServer implements Runnable {
} catch (IOException e) {
//swallow at this point
}
- err(oom);
+ LOG.error("oom", oom);
exit(1);
}
@@ -364,18 +351,14 @@ public class PipesServer implements Runnable {
try {
parser.parse(stream, handler, metadata, parseContext);
} catch (SAXException e) {
- warn(e);
- //LOG.warn("problem:" + fetchKey.getFetchKey(), e);
+ LOG.warn("sax problem:" + fetchEmitTuple.getId(), e);
} catch (EncryptedDocumentException e) {
- warn(e);
- //LOG.warn("encrypted:" + fetchKey.getFetchKey(), e);
+ LOG.warn("encrypted document:" + fetchEmitTuple.getId(), e);
} catch (SecurityException e) {
- warn(e);
- //LOG.warn("security exception: " + fetchKey.getFetchKey());
+ LOG.warn("security exception:" + fetchEmitTuple.getId(), e);
throw e;
} catch (Exception e) {
- warn(e);
- //LOG.warn("exception: " + fetchKey.getFetchKey());
+ LOG.warn("exception: " + fetchEmitTuple.getId(), e);
} catch (OutOfMemoryError e) {
//TODO, maybe return file type gathered so far and then crash?
//LOG.error("oom: " + fetchKey.getFetchKey());
@@ -396,8 +379,7 @@ public class PipesServer implements Runnable {
private void exit(int exitCode) {
- System.err.println("exiting: " + exitCode);
- System.err.flush();
+ LOG.warn("exiting: {}", exitCode);
System.exit(exitCode);
}
@@ -412,12 +394,10 @@ public class PipesServer implements Runnable {
return (FetchEmitTuple) objectInputStream.readObject();
}
} catch (IOException e) {
- err(e);
- //LOG.error("problem reading tuple", e);
+ LOG.error("problem reading tuple", e);
exit(1);
} catch (ClassNotFoundException e) {
- err(e);
- //LOG.error("can't find class?!", e);
+ LOG.error("can't find class?!", e);
exit(1);
}
//unreachable, no?!
@@ -448,8 +428,7 @@ public class PipesServer implements Runnable {
}
write(PARSE_SUCCESS, bos.toByteArray());
} catch (IOException e) {
- err(e);
- //LOG.error("problem writing emit data", e);
+ LOG.error("problem writing emit data (forking process shutdown?)",
e);
exit(1);
}
}
@@ -462,7 +441,7 @@ public class PipesServer implements Runnable {
output.write(bytes);
output.flush();
} catch (IOException e) {
- err(e);
+ LOG.error("problem writing data (forking process shutdown?)", e);
exit(1);
}
}
@@ -472,7 +451,7 @@ public class PipesServer implements Runnable {
output.write(status);
output.flush();
} catch (IOException e) {
- err(e);
+ LOG.error("problem writing data (forking process shutdown?)", e);
exit(1);
}
}
diff --git a/tika-core/src/main/resources/pipes-fork-server-default-log4j2.xml
b/tika-core/src/main/resources/pipes-fork-server-default-log4j2.xml
new file mode 100644
index 0000000..5f946e6
--- /dev/null
+++ b/tika-core/src/main/resources/pipes-fork-server-default-log4j2.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<Configuration status="INFO">
+ <Appenders>
+ <Console name="console" target="SYSTEM_ERR">
+ <PatternLayout
+ pattern="%-5p [%t] %d{HH:mm:ss,SSS} %c %m%n" />
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="info" additivity="false">
+ <AppenderRef ref="console" />
+ </Root>
+ </Loggers>
+</Configuration>
diff --git
a/tika-core/src/test/java/org/apache/tika/fork/ForkParserTikaBinTest.java
b/tika-core/src/test/java/org/apache/tika/fork/ForkParserTikaBinTest.java
index 062024d..edb4e89 100644
--- a/tika-core/src/test/java/org/apache/tika/fork/ForkParserTikaBinTest.java
+++ b/tika-core/src/test/java/org/apache/tika/fork/ForkParserTikaBinTest.java
@@ -63,9 +63,8 @@ public class ForkParserTikaBinTest extends TikaTest {
try (JarOutputStream jarOs = new
JarOutputStream(Files.newOutputStream(JAR_FILE))) {
ClassLoader loader = ForkServer.class.getClassLoader();
ClassPath classPath = ClassPath.from(loader);
-
addClasses(jarOs, classPath, ci ->
ci.getPackageName().startsWith("org.slf4j"));
- addClasses(jarOs, classPath, ci ->
ci.getPackageName().startsWith("org.apache.log4j"));
+ addClasses(jarOs, classPath, ci ->
ci.getPackageName().startsWith("org.apache.logging"));
addClasses(jarOs, classPath,
ci ->
ci.getPackageName().startsWith("org.apache.commons.io"));
//exclude TypeDetectionBenchmark because it is not serializable
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/solrtest/TikaPipesSolrTestBase.java
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/solrtest/TikaPipesSolrTestBase.java
index 8d80c32..e8a0cd2 100644
---
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/solrtest/TikaPipesSolrTestBase.java
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/solrtest/TikaPipesSolrTestBase.java
@@ -85,9 +85,9 @@ public abstract class TikaPipesSolrTestBase {
protected void runTikaAsyncSolrPipeIteratorFileFetcherSolrEmitter(boolean
useZk)
throws Exception {
File tikaConfigFile = new File("target", "ta.xml");
- File log4jPropFile = new File("target", "tmp-log4j2.properties");
+ File log4jPropFile = new File("target", "tmp-log4j2.xml");
try (InputStream is = PipeIntegrationTests.class
- .getResourceAsStream("/tika-async-log4j2.properties")) {
+ .getResourceAsStream("/pipes-fork-server-custom-log4j2.xml")) {
FileUtils.copyInputStreamToFile(is, log4jPropFile);
}
String tikaConfigTemplateXml;
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/resources/pipes-fork-server-custom-log4j2.xml
b/tika-pipes/tika-pipes-integration-tests/src/test/resources/pipes-fork-server-custom-log4j2.xml
new file mode 100644
index 0000000..5f946e6
--- /dev/null
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/resources/pipes-fork-server-custom-log4j2.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<Configuration status="INFO">
+ <Appenders>
+ <Console name="console" target="SYSTEM_ERR">
+ <PatternLayout
+ pattern="%-5p [%t] %d{HH:mm:ss,SSS} %c %m%n" />
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="info" additivity="false">
+ <AppenderRef ref="console" />
+ </Root>
+ </Loggers>
+</Configuration>