Author: tallison
Date: Wed Apr 1 18:52:08 2015
New Revision: 1670751
URL: http://svn.apache.org/r1670751
Log:
TIKA-1330 clean up logging in tika-batch ant tika-app integration of
tika-batch, take 2
Modified:
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceConsumer.java
Modified:
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceConsumer.java
URL:
http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceConsumer.java?rev=1670751&r1=1670750&r2=1670751&view=diff
==============================================================================
---
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceConsumer.java
(original)
+++
tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceConsumer.java
Wed Apr 1 18:52:08 2015
@@ -20,61 +20,61 @@ package org.apache.tika.batch;
import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamWriter;
-import java.io.Closeable;
-import java.io.Flushable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Date;
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.parser.ParseContext;
-import org.apache.tika.parser.Parser;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.xml.sax.ContentHandler;
-
-
-/**
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.parser.Parser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.ContentHandler;
+
+
+/**
* This is a base class for file consumers. The
* goal of this class is to abstract out the multithreading
* and recordkeeping components.
* <p/>
- */
-public abstract class FileResourceConsumer implements
Callable<IFileProcessorFutureResult> {
-
- private enum STATE {
- NOT_YET_STARTED,
- ACTIVELY_CONSUMING,
- SWALLOWED_POISON,
+ */
+public abstract class FileResourceConsumer implements
Callable<IFileProcessorFutureResult> {
+
+ private enum STATE {
+ NOT_YET_STARTED,
+ ACTIVELY_CONSUMING,
+ SWALLOWED_POISON,
THREAD_INTERRUPTED,
EXCEEDED_MAX_CONSEC_WAIT_MILLIS,
ASKED_TO_SHUTDOWN,
TIMED_OUT,
CONSUMER_EXCEPTION,
CONSUMER_ERROR,
- COMPLETED
- }
-
- public static String TIMED_OUT = "timed_out";
- public static String OOM = "oom";
- public static String IO_IS = "io_on_inputstream";
- public static String IO_OS = "io_on_outputstream";
- public static String PARSE_ERR = "parse_err";
- public static String PARSE_EX = "parse_ex";
-
- public static String ELAPSED_MILLIS = "elapsedMS";
-
- private static AtomicInteger numConsumers = new AtomicInteger(-1);
- protected static Logger logger =
LoggerFactory.getLogger(FileResourceConsumer.class);
-
- private long maxConsecWaitInMillis = 10*60*1000;// 10 minutes
-
+ COMPLETED
+ }
+
+ public static String TIMED_OUT = "timed_out";
+ public static String OOM = "oom";
+ public static String IO_IS = "io_on_inputstream";
+ public static String IO_OS = "io_on_outputstream";
+ public static String PARSE_ERR = "parse_err";
+ public static String PARSE_EX = "parse_ex";
+
+ public static String ELAPSED_MILLIS = "elapsedMS";
+
+ private static AtomicInteger numConsumers = new AtomicInteger(-1);
+ protected static Logger logger =
LoggerFactory.getLogger(FileResourceConsumer.class);
+
+ private long maxConsecWaitInMillis = 10*60*1000;// 10 minutes
+
private final ArrayBlockingQueue<FileResource> fileQueue;
private final XMLOutputFactory xmlOutputFactory =
XMLOutputFactory.newFactory();
@@ -257,35 +257,35 @@ public abstract class FileResourceConsum
FileStarted tmp = currentFile;
if (tmp == null) {
return null;
- }
- if (tmp.getElapsedMillis() > staleThresholdMillis) {
- setEndedState(STATE.TIMED_OUT);
- logger.error("{}", getXMLifiedLogMsg(
- TIMED_OUT,
- tmp.getResourceId(),
- ELAPSED_MILLIS,
Long.toString(tmp.getElapsedMillis())));
- return tmp;
- }
- }
- return null;
- }
-
- protected String getXMLifiedLogMsg(String type, String resourceId,
String... attrs) {
- return getXMLifiedLogMsg(type, resourceId, null, attrs);
- }
-
- /**
- * Use this for structured output that captures resourceId and other
attributes.
- *
- * @param type entity name for exception
- * @param resourceId resourceId string
- * @param t throwable can be null
- * @param attrs (array of key0, value0, key1, value1, etc.)
- */
- protected String getXMLifiedLogMsg(String type, String resourceId,
Throwable t, String... attrs) {
-
- StringWriter writer = new StringWriter();
- try {
+ }
+ if (tmp.getElapsedMillis() > staleThresholdMillis) {
+ setEndedState(STATE.TIMED_OUT);
+ logger.error("{}", getXMLifiedLogMsg(
+ TIMED_OUT,
+ tmp.getResourceId(),
+ ELAPSED_MILLIS,
Long.toString(tmp.getElapsedMillis())));
+ return tmp;
+ }
+ }
+ return null;
+ }
+
+ protected String getXMLifiedLogMsg(String type, String resourceId,
String... attrs) {
+ return getXMLifiedLogMsg(type, resourceId, null, attrs);
+ }
+
+ /**
+ * Use this for structured output that captures resourceId and other
attributes.
+ *
+ * @param type entity name for exception
+ * @param resourceId resourceId string
+ * @param t throwable can be null
+ * @param attrs (array of key0, value0, key1, value1, etc.)
+ */
+ protected String getXMLifiedLogMsg(String type, String resourceId,
Throwable t, String... attrs) {
+
+ StringWriter writer = new StringWriter();
+ try {
XMLStreamWriter xml =
xmlOutputFactory.createXMLStreamWriter(writer);
xml.writeStartDocument();
xml.writeStartElement(type);
@@ -306,13 +306,13 @@ public abstract class FileResourceConsum
xml.writeEndDocument();
xml.flush();
xml.close();
- } catch (XMLStreamException e) {
- logger.error("error writing xml stream for: " + resourceId, t);
- }
- return writer.toString();
- }
-
- private FileResource getNextFileResource() throws InterruptedException {
+ } catch (XMLStreamException e) {
+ logger.error("error writing xml stream for: " + resourceId, t);
+ }
+ return writer.toString();
+ }
+
+ private FileResource getNextFileResource() throws InterruptedException {
FileResource fileResource = null;
long start = new Date().getTime();
while (fileResource == null) {
@@ -382,49 +382,46 @@ public abstract class FileResourceConsum
synchronized(lock) {
if (currentState == STATE.NOT_YET_STARTED ||
currentState == STATE.ACTIVELY_CONSUMING ||
- currentState == STATE.ASKED_TO_SHUTDOWN) {
- currentState = cause;
- }
- }
- }
-
- /**
- * Utility method to handle logging equivalently among all
- * implementing classes. Use, override or avoid as desired.
- * <p>
- * This will throw Errors, but it will catch all Exceptions and log them
- * @param resourceId resourceId
- * @param parser parser to use
- * @param is inputStream (will be closed by this method!)
- * @param handler handler for the content
- * @param m metadata
- * @param parseContext parse context
- * @throws Throwable
- */
- protected void parse(final String resourceId, final Parser parser,
InputStream is,
- final ContentHandler handler,
- final Metadata m, final ParseContext parseContext)
throws Throwable {
-
- try {
- parser.parse(is, handler, m, parseContext);
- } catch (Throwable t) {
- if (t instanceof OutOfMemoryError) {
- logger.error(getXMLifiedLogMsg(OOM,
- resourceId, t));
- throw t;
- } else if (t instanceof Error) {
- logger.error(getXMLifiedLogMsg(PARSE_ERR,
- resourceId, t));
- throw t;
- } else {
- //warn, but do not rethrow
- logger.warn(getXMLifiedLogMsg(PARSE_EX,
- resourceId, t));
- incrementHandledExceptions();
- }
- } finally {
- close(is);
- }
- }
-
+ currentState == STATE.ASKED_TO_SHUTDOWN) {
+ currentState = cause;
+ }
+ }
+ }
+
+ /**
+ * Utility method to handle logging equivalently among all
+ * implementing classes. Use, override or avoid as desired.
+ *
+ * @param resourceId resourceId
+ * @param parser parser to use
+ * @param is inputStream (will be closed by this method!)
+ * @param handler handler for the content
+ * @param m metadata
+ * @param parseContext parse context
+ * @throws Throwable (logs and then throws whatever was thrown (if
anything)
+ */
+ protected void parse(final String resourceId, final Parser parser,
InputStream is,
+ final ContentHandler handler,
+ final Metadata m, final ParseContext parseContext)
throws Throwable {
+
+ try {
+ parser.parse(is, handler, m, parseContext);
+ } catch (Throwable t) {
+ if (t instanceof OutOfMemoryError) {
+ logger.error(getXMLifiedLogMsg(OOM,
+ resourceId, t));
+ } else if (t instanceof Error) {
+ logger.error(getXMLifiedLogMsg(PARSE_ERR,
+ resourceId, t));
+ } else {
+ logger.warn(getXMLifiedLogMsg(PARSE_EX,
+ resourceId, t));
+ incrementHandledExceptions();
+ }
+ throw t;
+ } finally {
+ close(is);
+ }
+ }
+
}