Revision: 17443
          http://sourceforge.net/p/gate/code/17443
Author:   valyt
Date:     2014-02-26 16:18:49 +0000 (Wed, 26 Feb 2014)
Log Message:
-----------
Merging changes to mimir-client from the 5.0 branch.

Modified Paths:
--------------
    mimir/trunk/mimir-client/src/gate/mimir/index/MimirConnector.java
    mimir/trunk/mimir-client/src/gate/mimir/index/MimirIndexingPR.java
    mimir/trunk/mimir-client/src/gate/mimir/search/RemoteQueryRunner.java
    mimir/trunk/mimir-client/src/gate/mimir/tool/WebUtils.java

Property Changed:
----------------
    mimir/trunk/

Index: mimir/trunk
===================================================================
--- mimir/trunk 2014-02-26 16:08:25 UTC (rev 17442)
+++ mimir/trunk 2014-02-26 16:18:49 UTC (rev 17443)

Property changes on: mimir/trunk
___________________________________________________________________
Modified: svn:mergeinfo
## -1,4 +1,4 ##
 /mimir/branches/3.4:14623,14634-14643,14687
 /mimir/branches/4.0:15380-15383,15385-15386,15388
 /mimir/branches/4.x:14298-14348
-/mimir/branches/5.0:17034
+/mimir/branches/5.0:17034,17423-17426
\ No newline at end of property
Modified: mimir/trunk/mimir-client/src/gate/mimir/index/MimirConnector.java
===================================================================
--- mimir/trunk/mimir-client/src/gate/mimir/index/MimirConnector.java   
2014-02-26 16:08:25 UTC (rev 17442)
+++ mimir/trunk/mimir-client/src/gate/mimir/index/MimirConnector.java   
2014-02-26 16:18:49 UTC (rev 17443)
@@ -18,35 +18,96 @@
 import gate.Document;
 import gate.mimir.tool.WebUtils;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.ObjectOutputStream;
 import java.net.URL;
+import java.util.Timer;
+import java.util.TimerTask;
 
+import org.apache.log4j.Logger;
+
 /**
  * Utility class that implements the client side of the Mimir RPC indexing
  * protocol. 
  */
 public class MimirConnector {
+  
+  protected static Logger logger = Logger.getLogger(MimirConnector.class);
+  
+  /**
+   * The maximum size of the {@link #byteBuffer}.
+   */
+  protected static final int BYTE_BUFFER_SIZE = 8 * 1024 *1024;
+  
+  /**
+   * Timer used to regularly check if there is any data to send to the remote 
+   * server.
+   */
+  protected Timer backgroundTimer;
+  
+  /**
+   * Has this connector been closed?
+   */
+  protected volatile boolean closed = false;
+  
+  /**
+   * The last time we sent data to the remote server
+   */
+  protected volatile long lastWrite;
+  
+  /**
+   * A byte buffer accumulating data to be sent to the remote server.
+   */
+  protected ByteArrayOutputStream byteBuffer;
+  
+  /**
+   * An instance of {@link ObjectOutputStream} used to serialise document for
+   * transmission over the wire.
+   */
+  protected ObjectOutputStream objectOutputStream;
+  
+  /**
+   * The name for the document feature used to hold the document URI.
+   */
   public static final String MIMIR_URI_FEATURE = "gate.mimir.uri";
   
-  private WebUtils webUtils;
   
-  private static MimirConnector staticConnector;
   
-  public MimirConnector(WebUtils webUtils) {
+  protected WebUtils webUtils;
+  
+  /**
+   * The number of milliseconds between connections to the remote server. All
+   * documents submitted for indexing are locally cached and get sent to the 
+   * server in batches, at intervals defined by this value.
+   */
+  private int connectionInterval = -1;
+  
+  /**
+   * The URL of the mimir index that is to receive the document.  This would 
+   * typically be of the form 
+   * <code>http://server:port/mimir/&lt;index UUID&gt;/</code>.
+   */
+  protected URL indexURL;
+  
+  /**
+   * The default value for the connection interval (see 
+   * {@link #setConnectionInterval(int)}).
+   */
+  public static final int DEFAULT_CONNECTION_INTERVAL = -1;
+  
+  public MimirConnector(URL indexUrl, WebUtils webUtils) throws IOException {
+    this.indexURL = indexUrl;
     this.webUtils = webUtils;
+    byteBuffer = new ByteArrayOutputStream(BYTE_BUFFER_SIZE);
+    objectOutputStream = new ObjectOutputStream(byteBuffer);
+    lastWrite = System.currentTimeMillis();
   }
   
-  public MimirConnector() {
-    this(new WebUtils());
+  public MimirConnector(URL indexUrl) throws IOException {
+    this(indexUrl, new WebUtils());
   }
   
-  public static MimirConnector defaultConnector() {
-    if(staticConnector == null) {
-      staticConnector = new MimirConnector();
-    }
-    return staticConnector;
-  }
-  
   /**
    * Pass the given GATE document to the Mimir index at the given URL for
    * indexing.  The document should match the expectations of the Mimir index
@@ -63,38 +124,38 @@
    * @param indexURL the URL of the mimir index that is to receive the
    *         document.  This would typically be of the form
    *         <code>http://server:port/mimir/&lt;index UUID&gt;/</code>.
-   * @throws IOException if any error occurs communicating with the Mimir
+   * @throws IOException if any error has occurred communicating with the Mímir
    *         service.
+   * @throws InterruptedException if the current thread is interrupted while
+   * waiting to submit the document to the input queue.
    */
-  public void sendToMimir(Document doc, String documentURI,
-          URL indexURL) throws IOException {
+  public void sendToMimir(Document doc, String documentURI) throws 
IOException, InterruptedException {
+    if(closed) throw new IOException("This Mímir connector has been closed.");
+
     boolean uriFeatureWasSet = false;
     Object oldUriFeatureValue = null;
 
-    if(documentURI != null) {
+    if(documentURI != null && doc != null) {
       // set the URI as a document feature, saving the old value (if any)
       uriFeatureWasSet = doc.getFeatures().containsKey(MIMIR_URI_FEATURE);
       oldUriFeatureValue = doc.getFeatures().get(MIMIR_URI_FEATURE);
       doc.getFeatures().put(MIMIR_URI_FEATURE, documentURI);
     }
-    // first phase - call the indexUrl action to find out where to post the
-    // data
-    StringBuilder indexURLString = new 
StringBuilder(indexURL.toExternalForm());
-    if(indexURLString.length() == 0) {
-      throw new IllegalArgumentException("No index URL specified");
+    
+    synchronized(this) {
+      if(doc != null){
+        objectOutputStream.writeObject(doc);
+      }
+      if(byteBuffer.size() > BYTE_BUFFER_SIZE) {
+        writeBuffer(); // this will also empty (reset) the buffer
+      }
+      // if too long since last write, write the buffer
+      if(System.currentTimeMillis() - lastWrite > connectionInterval) {
+        writeBuffer();
+      }
     }
-    if(indexURLString.charAt(indexURLString.length() - 1) != '/') {
-      // add a slash if necessary
-      indexURLString.append('/');
-    }
-    indexURLString.append("manage/indexUrl");
-    StringBuilder postUrlBuilder = new StringBuilder();
-    webUtils.getText(postUrlBuilder, indexURLString.toString());
 
-    // second phase - post to the URL we were given
-    webUtils.postObject(postUrlBuilder.toString(), doc);
-
-    if(documentURI != null) {
+    if(documentURI != null && doc != null) {
       // reset the URI feature to the value it had (or didn't have) before
       if(uriFeatureWasSet) {
         doc.getFeatures().put(MIMIR_URI_FEATURE, oldUriFeatureValue);
@@ -103,5 +164,101 @@
       }
     }
   }
+  
+  /**
+   * Writes the current contents of the byte buffer to the remote server. 
+   * @throws IOException 
+   */
+  protected synchronized void writeBuffer() throws IOException {
+    if(byteBuffer.size() > 0) {
+      // first phase - call the indexUrl action to find out where to post the
+      // data
+      StringBuilder indexURLString = new 
StringBuilder(indexURL.toExternalForm());
+      if(indexURLString.length() == 0) {
+        throw new IllegalArgumentException("No index URL specified");
+      }
+      if(indexURLString.charAt(indexURLString.length() - 1) != '/') {
+        // add a slash if necessary
+        indexURLString.append('/');
+      }
+      indexURLString.append("manage/indexUrl");
+      StringBuilder postUrlBuilder = new StringBuilder();
+      
+      webUtils.getText(postUrlBuilder, indexURLString.toString());
+
+      // second phase - post to the URL we were given
+      // close the object OS so that it writes its coda
+      objectOutputStream.close();
+      webUtils.postData(postUrlBuilder.toString(), byteBuffer);
+      byteBuffer.reset();
+      objectOutputStream = new ObjectOutputStream(byteBuffer);
+    }
+    
+    lastWrite = System.currentTimeMillis();
+  }
+  
+  /**
+   * Gets the current value for the connection interval (see 
+   * {@link #setConnectionInterval(int)}). 
+   * @return
+   */
+  public int getConnectionInterval() {
+    return connectionInterval;
+  }
+
+  
+  /**
+   * Sets the number of milliseconds between connections to the remote server. 
+   * All documents submitted for indexing are locally cached and get sent to 
the 
+   * server in batches, at intervals defined by this value. Negative values 
mean
+   * that no local caching should take place, and documents should be sent to 
+   * the remote server as soon as possible. Defaults to 
+   * {@value #DEFAULT_CONNECTION_INTERVAL}.
+
+   * @param connectionInterval
+   */
+  public synchronized void setConnectionInterval(int connectionInterval) {
+    this.connectionInterval = connectionInterval;
+    if(connectionInterval <= 0 && backgroundTimer != null) {
+      backgroundTimer.cancel();
+    } else {
+      if(backgroundTimer != null) {
+        backgroundTimer.cancel();
+      }
+      backgroundTimer = new Timer(getClass().getName() +  " background timer");
+      // we set a timer task that regularly submits a null value. This causes
+      // the connector to flush any data that is getting too old.
+      backgroundTimer.schedule(new TimerTask() {
+        @Override
+        public void run() {
+          try {
+            sendToMimir(null, null);
+          } catch(Exception e) {
+            // this should never happen
+            logger.error(MimirConnector.class.getName() + " internal error", 
e);
+          }
+        }
+      }, connectionInterval, connectionInterval);
+    }
+  }
+
+  /**
+   * Notifies this Mímir connector that no more documents remain to be sent.
+   * At this point any locally cached documents are submitted to the remote
+   * server, after which the remote connection is closed. This method then 
+   * returns.
+   * @throws IOException if the background thread used for the communication 
+   * with the remote end point has encountered a problem.
+   * @throws InterruptedException if the current thread is interrupted while
+   * waiting to notify the background thread of the termination.
+   */
+  public void close() throws IOException, InterruptedException {
+    closed = true;
+    if(backgroundTimer != null){
+      backgroundTimer.cancel();
+    }
+    // flush all cached content one last time
+    writeBuffer();
+  }
 }
 

Modified: mimir/trunk/mimir-client/src/gate/mimir/index/MimirIndexingPR.java
===================================================================
--- mimir/trunk/mimir-client/src/gate/mimir/index/MimirIndexingPR.java  
2014-02-26 16:08:25 UTC (rev 17442)
+++ mimir/trunk/mimir-client/src/gate/mimir/index/MimirIndexingPR.java  
2014-02-26 16:18:49 UTC (rev 17443)
@@ -14,20 +14,18 @@
  */
 package gate.mimir.index;
 
-import java.io.IOException;
-import java.net.URL;
-
-import gate.Resource;
 import gate.creole.AbstractLanguageAnalyser;
 import gate.creole.ExecutionException;
-import gate.creole.ResourceInstantiationException;
 import gate.creole.metadata.CreoleParameter;
 import gate.creole.metadata.CreoleResource;
 import gate.creole.metadata.Optional;
 import gate.creole.metadata.RunTime;
 import gate.mimir.tool.WebUtils;
+import gate.util.GateRuntimeException;
 
+import java.net.URL;
 
+
 /**
  * A simple PR for sending documents to a Mímir index.
  */
@@ -35,6 +33,8 @@
  name="Mímir Indexing PR")
 public class MimirIndexingPR extends AbstractLanguageAnalyser {
 
+  private static final long serialVersionUID = 3291873032301133998L;
+
   private URL mimirIndexUrl;
   
   private String mimirUsername;
@@ -83,6 +83,12 @@
 
   @Override
   public void cleanup() {
+    try {
+      mimirConnector.close();
+    } catch(Exception e) {
+      throw new GateRuntimeException("Execption while closing Mímir 
connector", 
+          e);
+    }
     mimirConnector = null;
   }
 
@@ -92,15 +98,14 @@
       if(mimirConnector == null) {
         // first run or config has changed: [re-]create
         if(mimirUsername != null && mimirUsername.length() > 0) {
-          mimirConnector = new MimirConnector(
+          mimirConnector = new MimirConnector(mimirIndexUrl,
             new WebUtils(mimirUsername, mimirPassword));          
         } else {
-          mimirConnector = new MimirConnector();  
+          mimirConnector = new MimirConnector(mimirIndexUrl);  
         }
       }
-      
-      mimirConnector.sendToMimir(getDocument(), null, mimirIndexUrl);
-    } catch(IOException e) {
+      mimirConnector.sendToMimir(getDocument(), null);
+    } catch(Exception e) {
       throw new ExecutionException(
         "Error communicating with the Mímir server", e);
     }

Modified: mimir/trunk/mimir-client/src/gate/mimir/search/RemoteQueryRunner.java
===================================================================
--- mimir/trunk/mimir-client/src/gate/mimir/search/RemoteQueryRunner.java       
2014-02-26 16:08:25 UTC (rev 17442)
+++ mimir/trunk/mimir-client/src/gate/mimir/search/RemoteQueryRunner.java       
2014-02-26 16:18:49 UTC (rev 17443)
@@ -19,21 +19,14 @@
 import gate.mimir.search.query.Binding;
 import gate.mimir.search.query.QueryNode;
 import gate.mimir.tool.WebUtils;
-import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
 import it.unimi.dsi.fastutil.doubles.DoubleBigArrayBigList;
 import it.unimi.dsi.fastutil.doubles.DoubleBigList;
-import it.unimi.dsi.fastutil.doubles.DoubleList;
-import it.unimi.dsi.fastutil.ints.Int2ObjectLinkedOpenHashMap;
-import it.unimi.dsi.fastutil.ints.IntArrayList;
-import it.unimi.dsi.fastutil.ints.IntList;
 import it.unimi.dsi.fastutil.longs.Long2ObjectLinkedOpenHashMap;
 import it.unimi.dsi.fastutil.longs.LongBigArrayBigList;
 import it.unimi.dsi.fastutil.longs.LongBigList;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-import java.net.CookieHandler;
 import java.net.URLEncoder;
 import java.util.HashMap;
 import java.util.List;

Modified: mimir/trunk/mimir-client/src/gate/mimir/tool/WebUtils.java
===================================================================
--- mimir/trunk/mimir-client/src/gate/mimir/tool/WebUtils.java  2014-02-26 
16:08:25 UTC (rev 17442)
+++ mimir/trunk/mimir-client/src/gate/mimir/tool/WebUtils.java  2014-02-26 
16:18:49 UTC (rev 17443)
@@ -14,6 +14,7 @@
  */
 package gate.mimir.tool;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
@@ -536,6 +537,70 @@
   
   /**
    * Calls a web service action (i.e. it connects to a URL) using the POST HTTP
+   * method, sending the given bytes as the request
+   * body.  The request is sent using chunked transfer encoding, and the
+   * request's Content-Type is set to application/octet-stream.  If the
+   * connection fails, for whatever reason, or the response code is different
+   * from {@link HttpURLConnection#HTTP_OK}, then an IOException is raised.
+   * This method will drain (and discard) all content available from either the
+   * input and error streams of the resulting connection (which should permit
+   * connection keepalives).
+   *   
+   * @param baseUrl the constant part of the URL to be accessed.
+   * @param data a {@link ByteArrayOutputStream} containing the data to be 
+   * written. Its {@link ByteArrayOutputStream#writeTo(OutputStream)} method 
+   * will be called causing it to write its data to the output connection.
+   * @param params an array of String values, that contain an alternation of
+   * parameter name, and parameter values.
+   * @throws IOException if the connection fails.
+   */
+  public void postData(String baseUrl, ByteArrayOutputStream data,
+          String... params) throws IOException {
+    URL actionUrl = new URL(buildUrl(baseUrl, params));
+    URLConnection conn = openURLConnection(actionUrl);
+    if(conn instanceof HttpURLConnection) {
+      HttpURLConnection httpConn = (HttpURLConnection)conn;
+      try {
+        // enable output and set HTTP method
+        httpConn.setDoOutput(true);
+        httpConn.setRequestMethod("POST");
+        // turn on chunking (we don't want to buffer the output if we don't
+        // have to). 0 means use default chunk size.
+        httpConn.setChunkedStreamingMode(0);
+        // don't time out
+        httpConn.setConnectTimeout(0);
+        httpConn.setReadTimeout(0);
+        
+        // MIME type (defaults to form encoded, so must change it)
+        httpConn.setRequestProperty("Content-Type", 
"application/octet-stream");
+
+        // connect and send the object
+        httpConn.connect();
+        
+        OutputStream httpOutputStream = httpConn.getOutputStream();
+        data.writeTo(httpOutputStream);
+        httpOutputStream.close();
+        
+        int code = httpConn.getResponseCode();
+        if(code != HttpURLConnection.HTTP_OK) {
+          // try to get more details
+          String message = httpConn.getResponseMessage();
+          throw new IOException(code
+                  + (message != null ? " (" + message + ")" : "")
+                  + " Remote connection failed.");
+        }
+      } finally {
+        // make sure the connection is drained, to allow connection keepalive
+        drainConnection(httpConn);
+      }
+    } else {
+      throw new IOException("Connection received is not HTTP!"
+              + " Connection class: " + conn.getClass().getCanonicalName());
+    }
+  }  
+  
+  /**
+   * Calls a web service action (i.e. it connects to a URL) using the POST HTTP
    * method, sending the given object in Java serialized format as the request
    * body.  The request is sent using chunked transfer encoding, and the
    * request's Content-Type is set to application/octet-stream.  If the

This was sent by the SourceForge.net collaborative development platform, the 
world's largest Open Source development site.


------------------------------------------------------------------------------
Flow-based real-time traffic analytics software. Cisco certified tool.
Monitor traffic, SLAs, QoS, Medianet, WAAS etc. with NetFlow Analyzer
Customize your own dashboards, set traffic alerts and generate reports.
Network behavioral analysis & security monitoring. All-in-one tool.
http://pubads.g.doubleclick.net/gampad/clk?id=126839071&iu=/4140/ostg.clktrk
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs

Reply via email to