Author: kwright
Date: Fri May 12 06:14:06 2017
New Revision: 1794928
URL: http://svn.apache.org/viewvc?rev=1794928&view=rev
Log:
Add a non-trivial check() method
Modified:
manifoldcf/branches/CONNECTORS-1425/connectors/tikaservice/connector/src/main/java/org/apache/manifoldcf/agents/transformation/tikaservice/TikaExtractor.java
Modified:
manifoldcf/branches/CONNECTORS-1425/connectors/tikaservice/connector/src/main/java/org/apache/manifoldcf/agents/transformation/tikaservice/TikaExtractor.java
URL:
http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1425/connectors/tikaservice/connector/src/main/java/org/apache/manifoldcf/agents/transformation/tikaservice/TikaExtractor.java?rev=1794928&r1=1794927&r2=1794928&view=diff
==============================================================================
---
manifoldcf/branches/CONNECTORS-1425/connectors/tikaservice/connector/src/main/java/org/apache/manifoldcf/agents/transformation/tikaservice/TikaExtractor.java
(original)
+++
manifoldcf/branches/CONNECTORS-1425/connectors/tikaservice/connector/src/main/java/org/apache/manifoldcf/agents/transformation/tikaservice/TikaExtractor.java
Fri May 12 06:14:06 2017
@@ -52,6 +52,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.*;
import java.util.concurrent.TimeUnit;
+import java.nio.charset.StandardCharsets;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
@@ -111,11 +112,14 @@ public class TikaExtractor extends org.a
protected final static URI metaURI;
/** Content URI */
protected final static URI contentURI;
+ /** Check URI */
+ protected final static URI checkURI;
static {
try {
metaURI = new URI("/meta");
contentURI = new URI("/tika");
+ checkURI = new URI("/detect/stream");
} catch (URISyntaxException e) {
throw new RuntimeException(e.getMessage());
}
@@ -246,7 +250,20 @@ public class TikaExtractor extends org.a
throws ManifoldCFException
{
getSession();
- // MHL
+ final HttpPut httpPut = new HttpPut(metaURI);
+ httpPut.addHeader("Accept", "application/json");
+ final HttpEntity entity = new InputStreamEntity(new
ByteArrayInputStream("this is a test".getBytes(StandardCharsets.UTF_8)));
+ httpPut.setEntity(entity);
+ HttpResponse response;
+ try {
+ response = this.httpClient.execute(tikaHost, httpPut);
+ } catch (IOException e) {
+ return "Connection error: "+e.getMessage();
+ }
+ final int responseCode = response.getStatusLine().getStatusCode();
+ if (response.getStatusLine().getStatusCode() != 200) {
+ return "Bad response: "+response.getStatusLine();
+ }
return super.check();
}
@@ -539,203 +556,208 @@ public class TikaExtractor extends org.a
// Tika-extracted metadata
// (4) Call downstream document processing
- DestinationStorage ds;
-
+ final DestinationStorage ds;
if (document.getBinaryLength() <= inMemoryMaximumFile) {
ds = new MemoryDestinationStorage((int) document.getBinaryLength());
} else {
ds = new FileDestinationStorage();
}
try {
- Metadata metadata = new Metadata();
- if (document.getFileName() != null) {
- metadata.add(TikaMetadataKeys.RESOURCE_NAME_KEY,
document.getFileName());
- metadata.add("stream_name", document.getFileName());
- }
- if (document.getMimeType() != null) {
- mime = document.getMimeType();
- metadata.add("Content-Type", mime);
- }
- metadata.add("stream_size", new
Long(document.getBinaryLength()).toString());
+ DestinationStorage responseDs = null;
+ try {
+ Metadata metadata = new Metadata();
+ if (document.getFileName() != null) {
+ metadata.add(TikaMetadataKeys.RESOURCE_NAME_KEY,
document.getFileName());
+ metadata.add("stream_name", document.getFileName());
+ }
+ if (document.getMimeType() != null) {
+ mime = document.getMimeType();
+ metadata.add("Content-Type", mime);
+ }
+ metadata.add("stream_size", new
Long(document.getBinaryLength()).toString());
- // We only log the extraction
- long startTime = System.currentTimeMillis();
- String resultCode = "OK";
- String description = null;
- Long length = null;
+ // We only log the extraction
+ long startTime = System.currentTimeMillis();
+ String resultCode = "OK";
+ String description = null;
+ Long length = null;
- try {
try {
-
- // Make a copy of the original stream as it needs to be sent two
- // times to Tika
- // one for the metadata and one for the content
- IOUtils.copy(document.getBinaryStream(), ds.getOutputStream());
-
- // Metadata
- HttpPut httpPut = new HttpPut(metaURI);
- if (!mime.isEmpty()) {
- httpPut.addHeader("Content-Type", mime);
- }
- httpPut.addHeader("Accept", "application/json");
- HttpEntity entity = new InputStreamEntity(ds.getInputStream());
- httpPut.setEntity(entity);
try {
- response = this.httpClient.execute(tikaHost, httpPut);
- } catch (IOException e) {
- // Retry 3 times, 10000 ms between retries, and abort if doesn't
work
- final long currentTime = System.currentTimeMillis();
- throw new ServiceInterruption("Tika down, retrying:
"+e.getMessage(),e,currentTime + 10000L,
- -1L,3,true);
- }
- int responseCode = response.getStatusLine().getStatusCode();
- if (response.getStatusLine().getStatusCode() == 200 ||
response.getStatusLine().getStatusCode() == 204) {
- tikaServerIs = response.getEntity().getContent();
+
+ // Make a copy of the original stream as it needs to be sent two
+ // times to Tika
+ // one for the metadata and one for the content
+ IOUtils.copy(document.getBinaryStream(), ds.getOutputStream());
+
+ // Metadata
+ HttpPut httpPut = new HttpPut(metaURI);
+ if (!mime.isEmpty()) {
+ httpPut.addHeader("Content-Type", mime);
+ }
+ httpPut.addHeader("Accept", "application/json");
+ HttpEntity entity = new InputStreamEntity(ds.getInputStream());
+ httpPut.setEntity(entity);
try {
- final BufferedReader br = new BufferedReader(new
InputStreamReader(tikaServerIs));
- final JSONParser parser = new JSONParser();
- JSONObject metaJson;
- final StringBuilder sb = new StringBuilder();
- String output;
- while ((output = br.readLine()) != null) {
- sb.append(output);
- }
- metaJson = (JSONObject) parser.parse(sb.toString());
- for (Object key : metaJson.keySet()) {
- metadata.add(key.toString(), metaJson.get(key).toString());
- }
- } finally {
- tikaServerIs.close();
+ response = this.httpClient.execute(tikaHost, httpPut);
+ } catch (IOException e) {
+ // Retry 3 times, 10000 ms between retries, and abort if doesn't
work
+ final long currentTime = System.currentTimeMillis();
+ throw new ServiceInterruption("Tika down, retrying:
"+e.getMessage(),e,currentTime + 10000L,
+ -1L,3,true);
}
- } else {
- activities.noDocument();
- if (responseCode == 422) {
- resultCode = "TIKASERVERREJECTS";
- description = "Tika Server rejected document with the following
reason: "
- + response.getStatusLine().getReasonPhrase();
- handleTikaServerRejects(description);
+ int responseCode = response.getStatusLine().getStatusCode();
+ if (response.getStatusLine().getStatusCode() == 200 ||
response.getStatusLine().getStatusCode() == 204) {
+ tikaServerIs = response.getEntity().getContent();
+ try {
+ final BufferedReader br = new BufferedReader(new
InputStreamReader(tikaServerIs));
+ final JSONParser parser = new JSONParser();
+ JSONObject metaJson;
+ final StringBuilder sb = new StringBuilder();
+ String output;
+ while ((output = br.readLine()) != null) {
+ sb.append(output);
+ }
+ metaJson = (JSONObject) parser.parse(sb.toString());
+ for (Object key : metaJson.keySet()) {
+ metadata.add(key.toString(), metaJson.get(key).toString());
+ }
+ } finally {
+ tikaServerIs.close();
+ }
} else {
- resultCode = "TIKASERVERERROR";
- description = "Tika Server failed to parse document with the
following error: "
- + response.getStatusLine().getReasonPhrase();
- handleTikaServerError(description);
+ activities.noDocument();
+ if (responseCode == 422) {
+ resultCode = "TIKASERVERREJECTS";
+ description = "Tika Server rejected document with the
following reason: "
+ + response.getStatusLine().getReasonPhrase();
+ handleTikaServerRejects(description);
+ } else {
+ resultCode = "TIKASERVERERROR";
+ description = "Tika Server failed to parse document with the
following error: "
+ + response.getStatusLine().getReasonPhrase();
+ handleTikaServerError(description);
+ }
+ return DOCUMENTSTATUS_REJECTED;
}
- return DOCUMENTSTATUS_REJECTED;
- }
- // Content
- httpPut = new HttpPut(contentURI);
- if (!mime.isEmpty()) {
- httpPut.addHeader("Content-Type", mime);
- }
- httpPut.addHeader("Accept", "text/plain");
- entity = new InputStreamEntity(ds.getInputStream());
- httpPut.setEntity(entity);
- try {
- response = this.httpClient.execute(tikaHost, httpPut);
- } catch (IOException e) {
- // Retry 3 times, 10000 ms between retries, and abort if doesn't
work
- final long currentTime = System.currentTimeMillis();
- throw new ServiceInterruption("Tika down, retrying:
"+e.getMessage(),e,currentTime + 10000L,
- -1L,3,true);
- }
-
- responseCode = response.getStatusLine().getStatusCode();
- if (response.getStatusLine().getStatusCode() == 200 ||
response.getStatusLine().getStatusCode() == 204) {
- tikaServerIs = response.getEntity().getContent();
+ // Content
+ httpPut = new HttpPut(contentURI);
+ if (!mime.isEmpty()) {
+ httpPut.addHeader("Content-Type", mime);
+ }
+ httpPut.addHeader("Accept", "text/plain");
+ entity = new InputStreamEntity(ds.getInputStream());
+ httpPut.setEntity(entity);
try {
- ds.close();
- ds = new FileDestinationStorage();
- IOUtils.copyLarge(tikaServerIs, ds.getOutputStream(), 0L,
sp.writeLimit);
- length = new Long(ds.getBinaryLength());
- } finally {
- tikaServerIs.close();
+ response = this.httpClient.execute(tikaHost, httpPut);
+ } catch (IOException e) {
+ // Retry 3 times, 10000 ms between retries, and abort if doesn't
work
+ final long currentTime = System.currentTimeMillis();
+ throw new ServiceInterruption("Tika down, retrying:
"+e.getMessage(),e,currentTime + 10000L,
+ -1L,3,true);
}
- } else {
- activities.noDocument();
- if (responseCode == 422) {
- resultCode = "TIKASERVERREJECTS";
- description = "Tika Server rejected document with the following
reason: "
- + response.getStatusLine().getReasonPhrase();
- return handleTikaServerRejects(description);
+
+ responseCode = response.getStatusLine().getStatusCode();
+ if (response.getStatusLine().getStatusCode() == 200 ||
response.getStatusLine().getStatusCode() == 204) {
+ tikaServerIs = response.getEntity().getContent();
+ try {
+ responseDs = new FileDestinationStorage();
+ IOUtils.copyLarge(tikaServerIs, responseDs.getOutputStream(),
0L, sp.writeLimit);
+ length = new Long(responseDs.getBinaryLength());
+ } finally {
+ tikaServerIs.close();
+ }
} else {
- resultCode = "TIKASERVERERROR";
- description = "Tika Server failed to parse document with the
following error: "
- + response.getStatusLine().getReasonPhrase();
- return handleTikaServerError(description);
+ activities.noDocument();
+ if (responseCode == 422) {
+ resultCode = "TIKASERVERREJECTS";
+ description = "Tika Server rejected document with the
following reason: "
+ + response.getStatusLine().getReasonPhrase();
+ return handleTikaServerRejects(description);
+ } else {
+ resultCode = "TIKASERVERERROR";
+ description = "Tika Server failed to parse document with the
following error: "
+ + response.getStatusLine().getReasonPhrase();
+ return handleTikaServerError(description);
+ }
}
- }
- } catch (IOException | ParseException e) {
- resultCode = "TIKASERVERRESPONSEISSUE";
- description = e.getMessage();
- int rval;
- if (e instanceof IOException) {
- rval = handleTikaServerException((IOException) e);
- } else {
- rval = handleTikaServerException((ParseException) e);
+ } catch (IOException | ParseException e) {
+ resultCode = "TIKASERVERRESPONSEISSUE";
+ description = e.getMessage();
+ int rval;
+ if (e instanceof IOException) {
+ rval = handleTikaServerException((IOException) e);
+ } else {
+ rval = handleTikaServerException((ParseException) e);
+ }
+ if (rval == DOCUMENTSTATUS_REJECTED) {
+ activities.noDocument();
+ }
+ return rval;
}
- if (rval == DOCUMENTSTATUS_REJECTED) {
+
+ if (!activities.checkLengthIndexable(responseDs.getBinaryLength())) {
activities.noDocument();
+ resultCode = activities.EXCLUDED_LENGTH;
+ description = "Downstream pipeline rejected document with length "
+ ds.getBinaryLength();
+ return DOCUMENTSTATUS_REJECTED;
}
- return rval;
- }
- if (!activities.checkLengthIndexable(ds.getBinaryLength())) {
- activities.noDocument();
- resultCode = activities.EXCLUDED_LENGTH;
- description = "Downstream pipeline rejected document with length " +
ds.getBinaryLength();
- return DOCUMENTSTATUS_REJECTED;
+ } finally {
+ // Log the extraction processing
+ activities.recordActivity(new Long(startTime), ACTIVITY_EXTRACT,
length, documentURI, resultCode, description);
}
- } finally {
- // Log the extraction processing
- activities.recordActivity(new Long(startTime), ACTIVITY_EXTRACT,
length, documentURI, resultCode, description);
- }
-
- // Parsing complete!
- // Create a copy of Repository Document
- RepositoryDocument docCopy = document.duplicate();
-
- // Get new stream length
- long newBinaryLength = ds.getBinaryLength();
- // Open new input stream
- InputStream is = ds.getInputStream();
+ // Parsing complete!
+ // Create a copy of Repository Document
+ final RepositoryDocument docCopy = document.duplicate();
+
+ // Get new stream length
+ final long newBinaryLength = responseDs.getBinaryLength();
+ // Open new input stream
+ final InputStream is = responseDs.getInputStream();
- try {
- docCopy.setBinary(is, newBinaryLength);
+ try {
+ docCopy.setBinary(is, newBinaryLength);
- // Set up all metadata from Tika. We may want to run this through a
- // mapper eventually...
- String[] metaNames = metadata.names();
- for (String mName : metaNames) {
- String value = metadata.get(mName);
- if (sp.lowerNames()) {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < mName.length(); i++) {
- char ch = mName.charAt(i);
- if (!Character.isLetterOrDigit(ch))
- ch = '_';
- else
- ch = Character.toLowerCase(ch);
- sb.append(ch);
+ // Set up all metadata from Tika. We may want to run this through a
+ // mapper eventually...
+ String[] metaNames = metadata.names();
+ for (String mName : metaNames) {
+ String value = metadata.get(mName);
+ if (sp.lowerNames()) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < mName.length(); i++) {
+ char ch = mName.charAt(i);
+ if (!Character.isLetterOrDigit(ch))
+ ch = '_';
+ else
+ ch = Character.toLowerCase(ch);
+ sb.append(ch);
+ }
+ mName = sb.toString();
}
- mName = sb.toString();
- }
- String target = sp.getMapping(mName);
- if (target != null) {
- docCopy.addField(target, value);
- } else {
- if (sp.keepAllMetadata()) {
- docCopy.addField(mName, value);
+ String target = sp.getMapping(mName);
+ if (target != null) {
+ docCopy.addField(target, value);
+ } else {
+ if (sp.keepAllMetadata()) {
+ docCopy.addField(mName, value);
+ }
}
}
- }
- // Send new document downstream
- return activities.sendDocument(documentURI, docCopy);
+ // Send new document downstream
+ return activities.sendDocument(documentURI, docCopy);
+ } finally {
+ is.close();
+ }
} finally {
- is.close();
+ if (responseDs != null) {
+ responseDs.close();
+ }
}
} finally {
ds.close();