Revision: 17443
http://sourceforge.net/p/gate/code/17443
Author: valyt
Date: 2014-02-26 16:18:49 +0000 (Wed, 26 Feb 2014)
Log Message:
-----------
Merging changes to mimir-client from the 5.0 branch.
Modified Paths:
--------------
mimir/trunk/mimir-client/src/gate/mimir/index/MimirConnector.java
mimir/trunk/mimir-client/src/gate/mimir/index/MimirIndexingPR.java
mimir/trunk/mimir-client/src/gate/mimir/search/RemoteQueryRunner.java
mimir/trunk/mimir-client/src/gate/mimir/tool/WebUtils.java
Property Changed:
----------------
mimir/trunk/
Index: mimir/trunk
===================================================================
--- mimir/trunk 2014-02-26 16:08:25 UTC (rev 17442)
+++ mimir/trunk 2014-02-26 16:18:49 UTC (rev 17443)
Property changes on: mimir/trunk
___________________________________________________________________
Modified: svn:mergeinfo
## -1,4 +1,4 ##
/mimir/branches/3.4:14623,14634-14643,14687
/mimir/branches/4.0:15380-15383,15385-15386,15388
/mimir/branches/4.x:14298-14348
-/mimir/branches/5.0:17034
+/mimir/branches/5.0:17034,17423-17426
\ No newline at end of property
Modified: mimir/trunk/mimir-client/src/gate/mimir/index/MimirConnector.java
===================================================================
--- mimir/trunk/mimir-client/src/gate/mimir/index/MimirConnector.java
2014-02-26 16:08:25 UTC (rev 17442)
+++ mimir/trunk/mimir-client/src/gate/mimir/index/MimirConnector.java
2014-02-26 16:18:49 UTC (rev 17443)
@@ -18,35 +18,96 @@
import gate.Document;
import gate.mimir.tool.WebUtils;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.ObjectOutputStream;
import java.net.URL;
+import java.util.Timer;
+import java.util.TimerTask;
+import org.apache.log4j.Logger;
+
/**
* Utility class that implements the client side of the Mimir RPC indexing
* protocol.
*/
public class MimirConnector {
+
+ protected static Logger logger = Logger.getLogger(MimirConnector.class);
+
+ /**
+ * The maximum size of the {@link #byteBuffer}.
+ */
+ protected static final int BYTE_BUFFER_SIZE = 8 * 1024 *1024;
+
+ /**
+ * Timer used to regularly check if there is any data to send to the remote
+ * server.
+ */
+ protected Timer backgroundTimer;
+
+ /**
+ * Has this connector been closed?
+ */
+ protected volatile boolean closed = false;
+
+ /**
+ * The last time we sent data to the remote server
+ */
+ protected volatile long lastWrite;
+
+ /**
+ * A byte buffer accumulating data to be sent to the remote server.
+ */
+ protected ByteArrayOutputStream byteBuffer;
+
+ /**
+ * An instance of {@link ObjectOutputStream} used to serialise document for
+ * transmission over the wire.
+ */
+ protected ObjectOutputStream objectOutputStream;
+
+ /**
+ * The name for the document feature used to hold the document URI.
+ */
public static final String MIMIR_URI_FEATURE = "gate.mimir.uri";
- private WebUtils webUtils;
- private static MimirConnector staticConnector;
- public MimirConnector(WebUtils webUtils) {
+ protected WebUtils webUtils;
+
+ /**
+ * The number of milliseconds between connections to the remote server. All
+ * documents submitted for indexing are locally cached and get sent to the
+ * server in batches, at intervals defined by this value.
+ */
+ private int connectionInterval = -1;
+
+ /**
+ * The URL of the mimir index that is to receive the document. This would
+ * typically be of the form
+ * <code>http://server:port/mimir/<index UUID>/</code>.
+ */
+ protected URL indexURL;
+
+ /**
+ * The default value for the connection interval (see
+ * {@link #setConnectionInterval(int)}).
+ */
+ public static final int DEFAULT_CONNECTION_INTERVAL = -1;
+
+ public MimirConnector(URL indexUrl, WebUtils webUtils) throws IOException {
+ this.indexURL = indexUrl;
this.webUtils = webUtils;
+ byteBuffer = new ByteArrayOutputStream(BYTE_BUFFER_SIZE);
+ objectOutputStream = new ObjectOutputStream(byteBuffer);
+ lastWrite = System.currentTimeMillis();
}
- public MimirConnector() {
- this(new WebUtils());
+ public MimirConnector(URL indexUrl) throws IOException {
+ this(indexUrl, new WebUtils());
}
- public static MimirConnector defaultConnector() {
- if(staticConnector == null) {
- staticConnector = new MimirConnector();
- }
- return staticConnector;
- }
-
/**
* Pass the given GATE document to the Mimir index at the given URL for
* indexing. The document should match the expectations of the Mimir index
@@ -63,38 +124,38 @@
* @param indexURL the URL of the mimir index that is to receive the
* document. This would typically be of the form
* <code>http://server:port/mimir/<index UUID>/</code>.
- * @throws IOException if any error occurs communicating with the Mimir
+ * @throws IOException if any error has occurred communicating with the Mímir
* service.
+ * @throws InterruptedException if the current thread is interrupted while
+ * waiting to submit the document to the input queue.
*/
- public void sendToMimir(Document doc, String documentURI,
- URL indexURL) throws IOException {
+ public void sendToMimir(Document doc, String documentURI) throws
IOException, InterruptedException {
+ if(closed) throw new IOException("This Mímir connector has been closed.");
+
boolean uriFeatureWasSet = false;
Object oldUriFeatureValue = null;
- if(documentURI != null) {
+ if(documentURI != null && doc != null) {
// set the URI as a document feature, saving the old value (if any)
uriFeatureWasSet = doc.getFeatures().containsKey(MIMIR_URI_FEATURE);
oldUriFeatureValue = doc.getFeatures().get(MIMIR_URI_FEATURE);
doc.getFeatures().put(MIMIR_URI_FEATURE, documentURI);
}
- // first phase - call the indexUrl action to find out where to post the
- // data
- StringBuilder indexURLString = new
StringBuilder(indexURL.toExternalForm());
- if(indexURLString.length() == 0) {
- throw new IllegalArgumentException("No index URL specified");
+
+ synchronized(this) {
+ if(doc != null){
+ objectOutputStream.writeObject(doc);
+ }
+ if(byteBuffer.size() > BYTE_BUFFER_SIZE) {
+ writeBuffer(); // this will also empty (reset) the buffer
+ }
+ // if too long since last write, write the buffer
+ if(System.currentTimeMillis() - lastWrite > connectionInterval) {
+ writeBuffer();
+ }
}
- if(indexURLString.charAt(indexURLString.length() - 1) != '/') {
- // add a slash if necessary
- indexURLString.append('/');
- }
- indexURLString.append("manage/indexUrl");
- StringBuilder postUrlBuilder = new StringBuilder();
- webUtils.getText(postUrlBuilder, indexURLString.toString());
- // second phase - post to the URL we were given
- webUtils.postObject(postUrlBuilder.toString(), doc);
-
- if(documentURI != null) {
+ if(documentURI != null && doc != null) {
// reset the URI feature to the value it had (or didn't have) before
if(uriFeatureWasSet) {
doc.getFeatures().put(MIMIR_URI_FEATURE, oldUriFeatureValue);
@@ -103,5 +164,101 @@
}
}
}
+
+ /**
+ * Writes the current contents of the byte buffer to the remote server.
+ * @throws IOException
+ */
+ protected synchronized void writeBuffer() throws IOException {
+ if(byteBuffer.size() > 0) {
+ // first phase - call the indexUrl action to find out where to post the
+ // data
+ StringBuilder indexURLString = new
StringBuilder(indexURL.toExternalForm());
+ if(indexURLString.length() == 0) {
+ throw new IllegalArgumentException("No index URL specified");
+ }
+ if(indexURLString.charAt(indexURLString.length() - 1) != '/') {
+ // add a slash if necessary
+ indexURLString.append('/');
+ }
+ indexURLString.append("manage/indexUrl");
+ StringBuilder postUrlBuilder = new StringBuilder();
+
+ webUtils.getText(postUrlBuilder, indexURLString.toString());
+
+ // second phase - post to the URL we were given
+ // close the object OS so that it writes its coda
+ objectOutputStream.close();
+ webUtils.postData(postUrlBuilder.toString(), byteBuffer);
+ byteBuffer.reset();
+ objectOutputStream = new ObjectOutputStream(byteBuffer);
+ }
+
+ lastWrite = System.currentTimeMillis();
+ }
+
+ /**
+ * Gets the current value for the connection interval (see
+ * {@link #setConnectionInterval(int)}).
+ * @return
+ */
+ public int getConnectionInterval() {
+ return connectionInterval;
+ }
+
+
+ /**
+ * Sets the number of milliseconds between connections to the remote server.
+ * All documents submitted for indexing are locally cached and get sent to
the
+ * server in batches, at intervals defined by this value. Negative values
mean
+ * that no local caching should take place, and documents should be sent to
+ * the remote server as soon as possible. Defaults to
+ * {@value #DEFAULT_CONNECTION_INTERVAL}.
+
+ * @param connectionInterval
+ */
+ public synchronized void setConnectionInterval(int connectionInterval) {
+ this.connectionInterval = connectionInterval;
+ if(connectionInterval <= 0 && backgroundTimer != null) {
+ backgroundTimer.cancel();
+ } else {
+ if(backgroundTimer != null) {
+ backgroundTimer.cancel();
+ }
+ backgroundTimer = new Timer(getClass().getName() + " background timer");
+ // we set a timer task that regularly submits a null value. This causes
+ // the connector to flush any data that is getting too old.
+ backgroundTimer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ sendToMimir(null, null);
+ } catch(Exception e) {
+ // this should never happen
+ logger.error(MimirConnector.class.getName() + " internal error",
e);
+ }
+ }
+ }, connectionInterval, connectionInterval);
+ }
+ }
+
+ /**
+ * Notifies this Mímir connector that no more documents remain to be sent.
+ * At this point any locally cached documents are submitted to the remote
+ * server, after which the remote connection is closed. This method then
+ * returns.
+ * @throws IOException if the background thread used for the communication
+ * with the remote end point has encountered a problem.
+ * @throws InterruptedException if the current thread is interrupted while
+ * waiting to notify the background thread of the termination.
+ */
+ public void close() throws IOException, InterruptedException {
+ closed = true;
+ if(backgroundTimer != null){
+ backgroundTimer.cancel();
+ }
+ // flush all cached content one last time
+ writeBuffer();
+ }
}
Modified: mimir/trunk/mimir-client/src/gate/mimir/index/MimirIndexingPR.java
===================================================================
--- mimir/trunk/mimir-client/src/gate/mimir/index/MimirIndexingPR.java
2014-02-26 16:08:25 UTC (rev 17442)
+++ mimir/trunk/mimir-client/src/gate/mimir/index/MimirIndexingPR.java
2014-02-26 16:18:49 UTC (rev 17443)
@@ -14,20 +14,18 @@
*/
package gate.mimir.index;
-import java.io.IOException;
-import java.net.URL;
-
-import gate.Resource;
import gate.creole.AbstractLanguageAnalyser;
import gate.creole.ExecutionException;
-import gate.creole.ResourceInstantiationException;
import gate.creole.metadata.CreoleParameter;
import gate.creole.metadata.CreoleResource;
import gate.creole.metadata.Optional;
import gate.creole.metadata.RunTime;
import gate.mimir.tool.WebUtils;
+import gate.util.GateRuntimeException;
+import java.net.URL;
+
/**
* A simple PR for sending documents to a Mímir index.
*/
@@ -35,6 +33,8 @@
name="Mímir Indexing PR")
public class MimirIndexingPR extends AbstractLanguageAnalyser {
+ private static final long serialVersionUID = 3291873032301133998L;
+
private URL mimirIndexUrl;
private String mimirUsername;
@@ -83,6 +83,12 @@
@Override
public void cleanup() {
+ try {
+ mimirConnector.close();
+ } catch(Exception e) {
+ throw new GateRuntimeException("Execption while closing Mímir
connector",
+ e);
+ }
mimirConnector = null;
}
@@ -92,15 +98,14 @@
if(mimirConnector == null) {
// first run or config has changed: [re-]create
if(mimirUsername != null && mimirUsername.length() > 0) {
- mimirConnector = new MimirConnector(
+ mimirConnector = new MimirConnector(mimirIndexUrl,
new WebUtils(mimirUsername, mimirPassword));
} else {
- mimirConnector = new MimirConnector();
+ mimirConnector = new MimirConnector(mimirIndexUrl);
}
}
-
- mimirConnector.sendToMimir(getDocument(), null, mimirIndexUrl);
- } catch(IOException e) {
+ mimirConnector.sendToMimir(getDocument(), null);
+ } catch(Exception e) {
throw new ExecutionException(
"Error communicating with the Mímir server", e);
}
Modified: mimir/trunk/mimir-client/src/gate/mimir/search/RemoteQueryRunner.java
===================================================================
--- mimir/trunk/mimir-client/src/gate/mimir/search/RemoteQueryRunner.java
2014-02-26 16:08:25 UTC (rev 17442)
+++ mimir/trunk/mimir-client/src/gate/mimir/search/RemoteQueryRunner.java
2014-02-26 16:18:49 UTC (rev 17443)
@@ -19,21 +19,14 @@
import gate.mimir.search.query.Binding;
import gate.mimir.search.query.QueryNode;
import gate.mimir.tool.WebUtils;
-import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
import it.unimi.dsi.fastutil.doubles.DoubleBigArrayBigList;
import it.unimi.dsi.fastutil.doubles.DoubleBigList;
-import it.unimi.dsi.fastutil.doubles.DoubleList;
-import it.unimi.dsi.fastutil.ints.Int2ObjectLinkedOpenHashMap;
-import it.unimi.dsi.fastutil.ints.IntArrayList;
-import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.longs.Long2ObjectLinkedOpenHashMap;
import it.unimi.dsi.fastutil.longs.LongBigArrayBigList;
import it.unimi.dsi.fastutil.longs.LongBigList;
import java.io.IOException;
import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-import java.net.CookieHandler;
import java.net.URLEncoder;
import java.util.HashMap;
import java.util.List;
Modified: mimir/trunk/mimir-client/src/gate/mimir/tool/WebUtils.java
===================================================================
--- mimir/trunk/mimir-client/src/gate/mimir/tool/WebUtils.java 2014-02-26
16:08:25 UTC (rev 17442)
+++ mimir/trunk/mimir-client/src/gate/mimir/tool/WebUtils.java 2014-02-26
16:18:49 UTC (rev 17443)
@@ -14,6 +14,7 @@
*/
package gate.mimir.tool;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@@ -536,6 +537,70 @@
/**
* Calls a web service action (i.e. it connects to a URL) using the POST HTTP
+ * method, sending the given bytes as the request
+ * body. The request is sent using chunked transfer encoding, and the
+ * request's Content-Type is set to application/octet-stream. If the
+ * connection fails, for whatever reason, or the response code is different
+ * from {@link HttpURLConnection#HTTP_OK}, then an IOException is raised.
+ * This method will drain (and discard) all content available from either the
+ * input and error streams of the resulting connection (which should permit
+ * connection keepalives).
+ *
+ * @param baseUrl the constant part of the URL to be accessed.
+ * @param data a {@link ByteArrayOutputStream} containing the data to be
+ * written. Its {@link ByteArrayOutputStream#writeTo(OutputStream)} method
+ * will be called causing it to write its data to the output connection.
+ * @param params an array of String values, that contain an alternation of
+ * parameter name, and parameter values.
+ * @throws IOException if the connection fails.
+ */
+ public void postData(String baseUrl, ByteArrayOutputStream data,
+ String... params) throws IOException {
+ URL actionUrl = new URL(buildUrl(baseUrl, params));
+ URLConnection conn = openURLConnection(actionUrl);
+ if(conn instanceof HttpURLConnection) {
+ HttpURLConnection httpConn = (HttpURLConnection)conn;
+ try {
+ // enable output and set HTTP method
+ httpConn.setDoOutput(true);
+ httpConn.setRequestMethod("POST");
+ // turn on chunking (we don't want to buffer the output if we don't
+ // have to). 0 means use default chunk size.
+ httpConn.setChunkedStreamingMode(0);
+ // don't time out
+ httpConn.setConnectTimeout(0);
+ httpConn.setReadTimeout(0);
+
+ // MIME type (defaults to form encoded, so must change it)
+ httpConn.setRequestProperty("Content-Type",
"application/octet-stream");
+
+ // connect and send the object
+ httpConn.connect();
+
+ OutputStream httpOutputStream = httpConn.getOutputStream();
+ data.writeTo(httpOutputStream);
+ httpOutputStream.close();
+
+ int code = httpConn.getResponseCode();
+ if(code != HttpURLConnection.HTTP_OK) {
+ // try to get more details
+ String message = httpConn.getResponseMessage();
+ throw new IOException(code
+ + (message != null ? " (" + message + ")" : "")
+ + " Remote connection failed.");
+ }
+ } finally {
+ // make sure the connection is drained, to allow connection keepalive
+ drainConnection(httpConn);
+ }
+ } else {
+ throw new IOException("Connection received is not HTTP!"
+ + " Connection class: " + conn.getClass().getCanonicalName());
+ }
+ }
+
+ /**
+ * Calls a web service action (i.e. it connects to a URL) using the POST HTTP
* method, sending the given object in Java serialized format as the request
* body. The request is sent using chunked transfer encoding, and the
* request's Content-Type is set to application/octet-stream. If the
This was sent by the SourceForge.net collaborative development platform, the
world's largest Open Source development site.
------------------------------------------------------------------------------
Flow-based real-time traffic analytics software. Cisco certified tool.
Monitor traffic, SLAs, QoS, Medianet, WAAS etc. with NetFlow Analyzer
Customize your own dashboards, set traffic alerts and generate reports.
Network behavioral analysis & security monitoring. All-in-one tool.
http://pubads.g.doubleclick.net/gampad/clk?id=126839071&iu=/4140/ostg.clktrk
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs