Author: kwright
Date: Fri May 12 06:14:06 2017
New Revision: 1794928

URL: http://svn.apache.org/viewvc?rev=1794928&view=rev
Log:
Add a non-trivial check() method

Modified:
    
manifoldcf/branches/CONNECTORS-1425/connectors/tikaservice/connector/src/main/java/org/apache/manifoldcf/agents/transformation/tikaservice/TikaExtractor.java

Modified: 
manifoldcf/branches/CONNECTORS-1425/connectors/tikaservice/connector/src/main/java/org/apache/manifoldcf/agents/transformation/tikaservice/TikaExtractor.java
URL: 
http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1425/connectors/tikaservice/connector/src/main/java/org/apache/manifoldcf/agents/transformation/tikaservice/TikaExtractor.java?rev=1794928&r1=1794927&r2=1794928&view=diff
==============================================================================
--- 
manifoldcf/branches/CONNECTORS-1425/connectors/tikaservice/connector/src/main/java/org/apache/manifoldcf/agents/transformation/tikaservice/TikaExtractor.java
 (original)
+++ 
manifoldcf/branches/CONNECTORS-1425/connectors/tikaservice/connector/src/main/java/org/apache/manifoldcf/agents/transformation/tikaservice/TikaExtractor.java
 Fri May 12 06:14:06 2017
@@ -52,6 +52,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
+import java.nio.charset.StandardCharsets;
 
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
@@ -111,11 +112,14 @@ public class TikaExtractor extends org.a
   protected final static URI metaURI;
   /** Content URI */
   protected final static URI contentURI;
+  /** Check URI */
+  protected final static URI checkURI;
   
   static {
     try {
       metaURI = new URI("/meta");
       contentURI = new URI("/tika");
+      checkURI = new URI("/detect/stream");
     } catch (URISyntaxException e) {
       throw new RuntimeException(e.getMessage());
     }
@@ -246,7 +250,20 @@ public class TikaExtractor extends org.a
     throws ManifoldCFException
   {
     getSession();
-    // MHL
+    final HttpPut httpPut = new HttpPut(metaURI);
+    httpPut.addHeader("Accept", "application/json");
+    final HttpEntity entity = new InputStreamEntity(new 
ByteArrayInputStream("this is a test".getBytes(StandardCharsets.UTF_8)));
+    httpPut.setEntity(entity);
+    HttpResponse response;
+    try {
+      response = this.httpClient.execute(tikaHost, httpPut);
+    } catch (IOException e) {
+      return "Connection error: "+e.getMessage();
+    }
+    final int responseCode = response.getStatusLine().getStatusCode();
+    if (response.getStatusLine().getStatusCode() != 200) {
+      return "Bad response: "+response.getStatusLine();
+    }
     return super.check();
   }
   
@@ -539,203 +556,208 @@ public class TikaExtractor extends org.a
     // Tika-extracted metadata
     // (4) Call downstream document processing
 
-    DestinationStorage ds;
-
+    final DestinationStorage ds;
     if (document.getBinaryLength() <= inMemoryMaximumFile) {
       ds = new MemoryDestinationStorage((int) document.getBinaryLength());
     } else {
       ds = new FileDestinationStorage();
     }
     try {
-      Metadata metadata = new Metadata();
-      if (document.getFileName() != null) {
-        metadata.add(TikaMetadataKeys.RESOURCE_NAME_KEY, 
document.getFileName());
-        metadata.add("stream_name", document.getFileName());
-      }
-      if (document.getMimeType() != null) {
-        mime = document.getMimeType();
-        metadata.add("Content-Type", mime);
-      }
-      metadata.add("stream_size", new 
Long(document.getBinaryLength()).toString());
+      DestinationStorage responseDs = null;
+      try {
+        Metadata metadata = new Metadata();
+        if (document.getFileName() != null) {
+          metadata.add(TikaMetadataKeys.RESOURCE_NAME_KEY, 
document.getFileName());
+          metadata.add("stream_name", document.getFileName());
+        }
+        if (document.getMimeType() != null) {
+          mime = document.getMimeType();
+          metadata.add("Content-Type", mime);
+        }
+        metadata.add("stream_size", new 
Long(document.getBinaryLength()).toString());
 
-      // We only log the extraction
-      long startTime = System.currentTimeMillis();
-      String resultCode = "OK";
-      String description = null;
-      Long length = null;
+        // We only log the extraction
+        long startTime = System.currentTimeMillis();
+        String resultCode = "OK";
+        String description = null;
+        Long length = null;
 
-      try {
         try {
-
-          // Make a copy of the original stream as it needs to be sent two
-          // times to Tika
-          // one for the metadata and one for the content
-          IOUtils.copy(document.getBinaryStream(), ds.getOutputStream());
-
-          // Metadata
-          HttpPut httpPut = new HttpPut(metaURI);
-          if (!mime.isEmpty()) {
-            httpPut.addHeader("Content-Type", mime);
-          }
-          httpPut.addHeader("Accept", "application/json");
-          HttpEntity entity = new InputStreamEntity(ds.getInputStream());
-          httpPut.setEntity(entity);
           try {
-            response = this.httpClient.execute(tikaHost, httpPut);
-          } catch (IOException e) {
-            // Retry 3 times, 10000 ms between retries, and abort if doesn't 
work
-            final long currentTime = System.currentTimeMillis();
-            throw new ServiceInterruption("Tika down, retrying: 
"+e.getMessage(),e,currentTime + 10000L,
-              -1L,3,true);
-          }
-          int responseCode = response.getStatusLine().getStatusCode();
-          if (response.getStatusLine().getStatusCode() == 200 || 
response.getStatusLine().getStatusCode() == 204) {
-            tikaServerIs = response.getEntity().getContent();
+
+            // Make a copy of the original stream as it needs to be sent two
+            // times to Tika
+            // one for the metadata and one for the content
+            IOUtils.copy(document.getBinaryStream(), ds.getOutputStream());
+
+            // Metadata
+            HttpPut httpPut = new HttpPut(metaURI);
+            if (!mime.isEmpty()) {
+              httpPut.addHeader("Content-Type", mime);
+            }
+            httpPut.addHeader("Accept", "application/json");
+            HttpEntity entity = new InputStreamEntity(ds.getInputStream());
+            httpPut.setEntity(entity);
             try {
-              final BufferedReader br = new BufferedReader(new 
InputStreamReader(tikaServerIs));
-              final JSONParser parser = new JSONParser();
-              JSONObject metaJson;
-              final StringBuilder sb = new StringBuilder();
-              String output;
-              while ((output = br.readLine()) != null) {
-                sb.append(output);
-              }
-              metaJson = (JSONObject) parser.parse(sb.toString());
-              for (Object key : metaJson.keySet()) {
-                metadata.add(key.toString(), metaJson.get(key).toString());
-              }
-            } finally {
-              tikaServerIs.close();
+              response = this.httpClient.execute(tikaHost, httpPut);
+            } catch (IOException e) {
+              // Retry 3 times, 10000 ms between retries, and abort if doesn't 
work
+              final long currentTime = System.currentTimeMillis();
+              throw new ServiceInterruption("Tika down, retrying: 
"+e.getMessage(),e,currentTime + 10000L,
+                -1L,3,true);
             }
-          } else {
-            activities.noDocument();
-            if (responseCode == 422) {
-              resultCode = "TIKASERVERREJECTS";
-              description = "Tika Server rejected document with the following 
reason: "
-                  + response.getStatusLine().getReasonPhrase();
-              handleTikaServerRejects(description);
+            int responseCode = response.getStatusLine().getStatusCode();
+            if (response.getStatusLine().getStatusCode() == 200 || 
response.getStatusLine().getStatusCode() == 204) {
+              tikaServerIs = response.getEntity().getContent();
+              try {
+                final BufferedReader br = new BufferedReader(new 
InputStreamReader(tikaServerIs));
+                final JSONParser parser = new JSONParser();
+                JSONObject metaJson;
+                final StringBuilder sb = new StringBuilder();
+                String output;
+                while ((output = br.readLine()) != null) {
+                  sb.append(output);
+                }
+                metaJson = (JSONObject) parser.parse(sb.toString());
+                for (Object key : metaJson.keySet()) {
+                  metadata.add(key.toString(), metaJson.get(key).toString());
+                }
+              } finally {
+                tikaServerIs.close();
+              }
             } else {
-              resultCode = "TIKASERVERERROR";
-              description = "Tika Server failed to parse document with the 
following error: "
-                  + response.getStatusLine().getReasonPhrase();
-              handleTikaServerError(description);
+              activities.noDocument();
+              if (responseCode == 422) {
+                resultCode = "TIKASERVERREJECTS";
+                description = "Tika Server rejected document with the 
following reason: "
+                    + response.getStatusLine().getReasonPhrase();
+                handleTikaServerRejects(description);
+              } else {
+                resultCode = "TIKASERVERERROR";
+                description = "Tika Server failed to parse document with the 
following error: "
+                    + response.getStatusLine().getReasonPhrase();
+                handleTikaServerError(description);
+              }
+              return DOCUMENTSTATUS_REJECTED;
             }
-            return DOCUMENTSTATUS_REJECTED;
-          }
 
-          // Content
-          httpPut = new HttpPut(contentURI);
-          if (!mime.isEmpty()) {
-            httpPut.addHeader("Content-Type", mime);
-          }
-          httpPut.addHeader("Accept", "text/plain");
-          entity = new InputStreamEntity(ds.getInputStream());
-          httpPut.setEntity(entity);
-          try {
-            response = this.httpClient.execute(tikaHost, httpPut);
-          } catch (IOException e) {
-            // Retry 3 times, 10000 ms between retries, and abort if doesn't 
work
-            final long currentTime = System.currentTimeMillis();
-            throw new ServiceInterruption("Tika down, retrying: 
"+e.getMessage(),e,currentTime + 10000L,
-              -1L,3,true);
-          }
-
-          responseCode = response.getStatusLine().getStatusCode();
-          if (response.getStatusLine().getStatusCode() == 200 || 
response.getStatusLine().getStatusCode() == 204) {
-            tikaServerIs = response.getEntity().getContent();
+            // Content
+            httpPut = new HttpPut(contentURI);
+            if (!mime.isEmpty()) {
+              httpPut.addHeader("Content-Type", mime);
+            }
+            httpPut.addHeader("Accept", "text/plain");
+            entity = new InputStreamEntity(ds.getInputStream());
+            httpPut.setEntity(entity);
             try {
-              ds.close();
-              ds = new FileDestinationStorage();
-              IOUtils.copyLarge(tikaServerIs, ds.getOutputStream(), 0L, 
sp.writeLimit);
-              length = new Long(ds.getBinaryLength());
-            } finally {
-              tikaServerIs.close();
+              response = this.httpClient.execute(tikaHost, httpPut);
+            } catch (IOException e) {
+              // Retry 3 times, 10000 ms between retries, and abort if doesn't 
work
+              final long currentTime = System.currentTimeMillis();
+              throw new ServiceInterruption("Tika down, retrying: 
"+e.getMessage(),e,currentTime + 10000L,
+                -1L,3,true);
             }
-          } else {
-            activities.noDocument();
-            if (responseCode == 422) {
-              resultCode = "TIKASERVERREJECTS";
-              description = "Tika Server rejected document with the following 
reason: "
-                  + response.getStatusLine().getReasonPhrase();
-              return handleTikaServerRejects(description);
+
+            responseCode = response.getStatusLine().getStatusCode();
+            if (response.getStatusLine().getStatusCode() == 200 || 
response.getStatusLine().getStatusCode() == 204) {
+              tikaServerIs = response.getEntity().getContent();
+              try {
+                responseDs = new FileDestinationStorage();
+                IOUtils.copyLarge(tikaServerIs, responseDs.getOutputStream(), 
0L, sp.writeLimit);
+                length = new Long(responseDs.getBinaryLength());
+              } finally {
+                tikaServerIs.close();
+              }
             } else {
-              resultCode = "TIKASERVERERROR";
-              description = "Tika Server failed to parse document with the 
following error: "
-                  + response.getStatusLine().getReasonPhrase();
-              return handleTikaServerError(description);
+              activities.noDocument();
+              if (responseCode == 422) {
+                resultCode = "TIKASERVERREJECTS";
+                description = "Tika Server rejected document with the 
following reason: "
+                    + response.getStatusLine().getReasonPhrase();
+                return handleTikaServerRejects(description);
+              } else {
+                resultCode = "TIKASERVERERROR";
+                description = "Tika Server failed to parse document with the 
following error: "
+                    + response.getStatusLine().getReasonPhrase();
+                return handleTikaServerError(description);
+              }
             }
-          }
 
-        } catch (IOException | ParseException e) {
-          resultCode = "TIKASERVERRESPONSEISSUE";
-          description = e.getMessage();
-          int rval;
-          if (e instanceof IOException) {
-            rval = handleTikaServerException((IOException) e);
-          } else {
-            rval = handleTikaServerException((ParseException) e);
+          } catch (IOException | ParseException e) {
+            resultCode = "TIKASERVERRESPONSEISSUE";
+            description = e.getMessage();
+            int rval;
+            if (e instanceof IOException) {
+              rval = handleTikaServerException((IOException) e);
+            } else {
+              rval = handleTikaServerException((ParseException) e);
+            }
+            if (rval == DOCUMENTSTATUS_REJECTED) {
+              activities.noDocument();
+            }
+            return rval;
           }
-          if (rval == DOCUMENTSTATUS_REJECTED) {
+
+          if (!activities.checkLengthIndexable(responseDs.getBinaryLength())) {
             activities.noDocument();
+            resultCode = activities.EXCLUDED_LENGTH;
+            description = "Downstream pipeline rejected document with length " 
+ ds.getBinaryLength();
+            return DOCUMENTSTATUS_REJECTED;
           }
-          return rval;
-        }
 
-        if (!activities.checkLengthIndexable(ds.getBinaryLength())) {
-          activities.noDocument();
-          resultCode = activities.EXCLUDED_LENGTH;
-          description = "Downstream pipeline rejected document with length " + 
ds.getBinaryLength();
-          return DOCUMENTSTATUS_REJECTED;
+        } finally {
+          // Log the extraction processing
+          activities.recordActivity(new Long(startTime), ACTIVITY_EXTRACT, 
length, documentURI, resultCode, description);
         }
 
-      } finally {
-        // Log the extraction processing
-        activities.recordActivity(new Long(startTime), ACTIVITY_EXTRACT, 
length, documentURI, resultCode, description);
-      }
-
-      // Parsing complete!
-      // Create a copy of Repository Document
-      RepositoryDocument docCopy = document.duplicate();
-
-      // Get new stream length
-      long newBinaryLength = ds.getBinaryLength();
-      // Open new input stream
-      InputStream is = ds.getInputStream();
+        // Parsing complete!
+        // Create a copy of Repository Document
+        final RepositoryDocument docCopy = document.duplicate();
+
+        // Get new stream length
+        final long newBinaryLength = responseDs.getBinaryLength();
+        // Open new input stream
+        final InputStream is = responseDs.getInputStream();
 
-      try {
-        docCopy.setBinary(is, newBinaryLength);
+        try {
+          docCopy.setBinary(is, newBinaryLength);
 
-        // Set up all metadata from Tika. We may want to run this through a
-        // mapper eventually...
-        String[] metaNames = metadata.names();
-        for (String mName : metaNames) {
-          String value = metadata.get(mName);
-          if (sp.lowerNames()) {
-            StringBuilder sb = new StringBuilder();
-            for (int i = 0; i < mName.length(); i++) {
-              char ch = mName.charAt(i);
-              if (!Character.isLetterOrDigit(ch))
-                ch = '_';
-              else
-                ch = Character.toLowerCase(ch);
-              sb.append(ch);
+          // Set up all metadata from Tika. We may want to run this through a
+          // mapper eventually...
+          String[] metaNames = metadata.names();
+          for (String mName : metaNames) {
+            String value = metadata.get(mName);
+            if (sp.lowerNames()) {
+              StringBuilder sb = new StringBuilder();
+              for (int i = 0; i < mName.length(); i++) {
+                char ch = mName.charAt(i);
+                if (!Character.isLetterOrDigit(ch))
+                  ch = '_';
+                else
+                  ch = Character.toLowerCase(ch);
+                sb.append(ch);
+              }
+              mName = sb.toString();
             }
-            mName = sb.toString();
-          }
-          String target = sp.getMapping(mName);
-          if (target != null) {
-            docCopy.addField(target, value);
-          } else {
-            if (sp.keepAllMetadata()) {
-              docCopy.addField(mName, value);
+            String target = sp.getMapping(mName);
+            if (target != null) {
+              docCopy.addField(target, value);
+            } else {
+              if (sp.keepAllMetadata()) {
+                docCopy.addField(mName, value);
+              }
             }
           }
-        }
 
-        // Send new document downstream
-        return activities.sendDocument(documentURI, docCopy);
+          // Send new document downstream
+          return activities.sendDocument(documentURI, docCopy);
+        } finally {
+          is.close();
+        }
       } finally {
-        is.close();
+        if (responseDs != null) {
+          responseDs.close();
+        }
       }
     } finally {
       ds.close();


Reply via email to