Revision: 17426
http://sourceforge.net/p/gate/code/17426
Author: valyt
Date: 2014-02-26 12:15:19 +0000 (Wed, 26 Feb 2014)
Log Message:
-----------
Redesigned the batching implementation to work with a single shared byte buffer
and object output stream, which allows it to be compatible with the current
implementation in Mimir 4 (and 5).
Modified Paths:
--------------
mimir/branches/5.0/mimir-client/src/gate/mimir/index/MimirConnector.java
Modified:
mimir/branches/5.0/mimir-client/src/gate/mimir/index/MimirConnector.java
===================================================================
--- mimir/branches/5.0/mimir-client/src/gate/mimir/index/MimirConnector.java
2014-02-26 11:58:22 UTC (rev 17425)
+++ mimir/branches/5.0/mimir-client/src/gate/mimir/index/MimirConnector.java
2014-02-26 12:15:19 UTC (rev 17426)
@@ -22,152 +22,50 @@
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.net.URL;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.Timer;
+import java.util.TimerTask;
+import org.apache.log4j.Logger;
+
/**
* Utility class that implements the client side of the Mimir RPC indexing
* protocol.
*/
public class MimirConnector {
- /**
- * Background runnable actually responsible for sending documents to the
- * remote server.
- */
- protected class DocumentPusher implements Runnable {
-
- public DocumentPusher() {
- byteBuffer = new ByteArrayOutputStream(BYTE_BUFFER_SIZE);
- lastWrite = System.currentTimeMillis();
- }
-
- /**
- * The maximum size of the {@link #byteBuffer}.
- */
- protected static final int BYTE_BUFFER_SIZE = 8 * 1024 *1024;
-
- protected long lastWrite;
- /**
- * A byte buffer accumulating data to be sent to the remote server.
- */
- protected ByteArrayOutputStream byteBuffer;
-
- @Override
- public void run() {
- try {
- // The logic is: if there is a connectionInterval (positive value),
then
- // check the state with the same interval, to make sure we flush the
- // data to the server, even if no more documents are submitted.
- // If the connection interval is negative, then every time there is
- // a document in the queue, it will immediately be sent to the server
- // so we don't need to time-out the poll.
- byte[] data = inputBuffer.poll(
- (connectionInterval > 0 ? connectionInterval : Integer.MAX_VALUE),
- TimeUnit.MILLISECONDS);
- while(data != END_OF_LIST) {
- try{
- if(data != null) {
- // if too much data, write the buffer
- if(data.length + byteBuffer.size() > BYTE_BUFFER_SIZE){
- writeBuffer(); // this will also empty (reset) the buffer
- }
- // add the current document to the buffer
- byteBuffer.write(data);
- }
- // if too long since last write, write the buffer
- if(System.currentTimeMillis() - lastWrite > connectionInterval) {
- writeBuffer();
- }
- } catch(IOException e) {
- // error communicating with the remote end point
- exception = e;
- }
- data = inputBuffer.poll(
- (connectionInterval > 0 ? connectionInterval :
Integer.MAX_VALUE),
- TimeUnit.MILLISECONDS);
- }
- // we're closing
- if(byteBuffer.size() > 0) {
- try{
- writeBuffer();
- } catch(IOException e) {
- // error communicating with the remote end point
- exception = e;
- }
- }
- } catch(InterruptedException e) {
- if(closed) {
- // we're done
- } else {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- /**
- * Writes the current contents of the byte buffer to the remote server.
- * @throws IOException
- */
- protected void writeBuffer() throws IOException {
- if(byteBuffer.size() > 0) {
- // first phase - call the indexUrl action to find out where to post the
- // data
- StringBuilder indexURLString = new
StringBuilder(indexURL.toExternalForm());
- if(indexURLString.length() == 0) {
- throw new IllegalArgumentException("No index URL specified");
- }
- if(indexURLString.charAt(indexURLString.length() - 1) != '/') {
- // add a slash if necessary
- indexURLString.append('/');
- }
- indexURLString.append("manage/indexUrl");
- StringBuilder postUrlBuilder = new StringBuilder();
-
- webUtils.getText(postUrlBuilder, indexURLString.toString());
-
- // second phase - post to the URL we were given
- webUtils.postData(postUrlBuilder.toString(), byteBuffer);
- byteBuffer.reset();
- }
-
- lastWrite = System.currentTimeMillis();
- }
- }
+ protected static Logger logger = Logger.getLogger(MimirConnector.class);
/**
- * Constant used to mark the end of input.
+ * The maximum size of the {@link #byteBuffer}.
*/
- protected static final byte[] END_OF_LIST = new byte[]{};
+ protected static final int BYTE_BUFFER_SIZE = 8 * 1024 *1024;
/**
- * The size (number of documents) of the {@link #inputBuffer}.
+ * Timer used to regularly check if there is any data to send to the remote
+ * server.
*/
- protected static final int INPUT_BUFFER_SIZE = 10;
+ protected Timer backgroundTimer;
/**
- * A queue holding serialised GATE documents waiting to be sent to the remote
- * server by the {@link DocumentPusher}.
+ * Has this connector been closed?
*/
- protected BlockingQueue<byte[]> inputBuffer;
+ protected volatile boolean closed = false;
/**
- * The background thread used to actually send the documents to the remote
- * server.
+ * The last time we sent data to the remote server
*/
- protected Thread pusherThread;
+ protected volatile long lastWrite;
/**
- * Has this connector been closed?
+ * A byte buffer accumulating data to be sent to the remote server.
*/
- protected volatile boolean closed = false;
+ protected ByteArrayOutputStream byteBuffer;
/**
- * If the background thread encounters a problem, the cause will be stored
- * here so that it can reported from the main thread at first opportunity
+ * An instance of {@link ObjectOutputStream} used to serialise document for
+ * transmission over the wire.
*/
- protected volatile IOException exception;
+ protected ObjectOutputStream objectOutputStream;
/**
* The name for the document feature used to hold the document URI.
@@ -198,19 +96,15 @@
*/
public static final int DEFAULT_CONNECTION_INTERVAL = -1;
- private static MimirConnector staticConnector;
-
- public MimirConnector(URL indexUrl, WebUtils webUtils) {
+ public MimirConnector(URL indexUrl, WebUtils webUtils) throws IOException {
this.indexURL = indexUrl;
this.webUtils = webUtils;
- inputBuffer = new ArrayBlockingQueue<>(INPUT_BUFFER_SIZE);
- DocumentPusher docPusher = new DocumentPusher();
- pusherThread = new Thread(docPusher,
- getClass().getName() + " document pusher");
- pusherThread.start();
+ byteBuffer = new ByteArrayOutputStream(BYTE_BUFFER_SIZE);
+ objectOutputStream = new ObjectOutputStream(byteBuffer);
+ lastWrite = System.currentTimeMillis();
}
- public MimirConnector(URL indexUrl) {
+ public MimirConnector(URL indexUrl) throws IOException {
this(indexUrl, new WebUtils());
}
@@ -237,31 +131,31 @@
*/
public void sendToMimir(Document doc, String documentURI) throws
IOException, InterruptedException {
if(closed) throw new IOException("This MÃmir connector has been closed.");
-
- if(exception != null) {
- IOException throwable = new IOException("Exception in background thread",
- exception);
- exception = null;
- throw throwable;
- }
-
+
boolean uriFeatureWasSet = false;
Object oldUriFeatureValue = null;
- if(documentURI != null) {
+ if(documentURI != null && doc != null) {
// set the URI as a document feature, saving the old value (if any)
uriFeatureWasSet = doc.getFeatures().containsKey(MIMIR_URI_FEATURE);
oldUriFeatureValue = doc.getFeatures().get(MIMIR_URI_FEATURE);
doc.getFeatures().put(MIMIR_URI_FEATURE, documentURI);
}
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(doc);
- oos.close();
- inputBuffer.put(baos.toByteArray());
+ synchronized(this) {
+ if(doc != null){
+ objectOutputStream.writeObject(doc);
+ }
+ if(byteBuffer.size() > BYTE_BUFFER_SIZE) {
+ writeBuffer(); // this will also empty (reset) the buffer
+ }
+ // if too long since last write, write the buffer
+ if(System.currentTimeMillis() - lastWrite > connectionInterval) {
+ writeBuffer();
+ }
+ }
- if(documentURI != null) {
+ if(documentURI != null && doc != null) {
// reset the URI feature to the value it had (or didn't have) before
if(uriFeatureWasSet) {
doc.getFeatures().put(MIMIR_URI_FEATURE, oldUriFeatureValue);
@@ -272,6 +166,38 @@
}
/**
+ * Writes the current contents of the byte buffer to the remote server.
+ * @throws IOException
+ */
+ protected synchronized void writeBuffer() throws IOException {
+ if(byteBuffer.size() > 0) {
+ // first phase - call the indexUrl action to find out where to post the
+ // data
+ StringBuilder indexURLString = new
StringBuilder(indexURL.toExternalForm());
+ if(indexURLString.length() == 0) {
+ throw new IllegalArgumentException("No index URL specified");
+ }
+ if(indexURLString.charAt(indexURLString.length() - 1) != '/') {
+ // add a slash if necessary
+ indexURLString.append('/');
+ }
+ indexURLString.append("manage/indexUrl");
+ StringBuilder postUrlBuilder = new StringBuilder();
+
+ webUtils.getText(postUrlBuilder, indexURLString.toString());
+
+ // second phase - post to the URL we were given
+ // close the object OS so that it writes its coda
+ objectOutputStream.close();
+ webUtils.postData(postUrlBuilder.toString(), byteBuffer);
+ byteBuffer.reset();
+ objectOutputStream = new ObjectOutputStream(byteBuffer);
+ }
+
+ lastWrite = System.currentTimeMillis();
+ }
+
+ /**
* Gets the current value for the connection interval (see
* {@link #setConnectionInterval(int)}).
* @return
@@ -291,8 +217,29 @@
* @param connectionInterval
*/
- public void setConnectionInterval(int connectionInterval) {
+ public synchronized void setConnectionInterval(int connectionInterval) {
this.connectionInterval = connectionInterval;
+ if(connectionInterval <= 0 && backgroundTimer != null) {
+ backgroundTimer.cancel();
+ } else {
+ if(backgroundTimer != null) {
+ backgroundTimer.cancel();
+ }
+ backgroundTimer = new Timer(getClass().getName() + " background timer");
+ // we set a timer task that regularly submits a null value. This causes
+ // the connector to flush any data that is getting too old.
+ backgroundTimer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ sendToMimir(null, null);
+ } catch(Exception e) {
+ // this should never happen
+ logger.error(MimirConnector.class.getName() + " internal error",
e);
+ }
+ }
+ }, connectionInterval, connectionInterval);
+ }
}
/**
@@ -306,15 +253,12 @@
* waiting to notify the background thread of the termination.
*/
public void close() throws IOException, InterruptedException {
- if(exception != null){
- IOException throwable = new IOException(
- "Exception in background thread", exception);
- exception = null;
- throw throwable;
+ closed = true;
+ if(backgroundTimer != null){
+ backgroundTimer.cancel();
}
- closed = true;
- inputBuffer.put(END_OF_LIST);
- pusherThread.join();
+ // flush all cached content one last time
+ writeBuffer();
}
}
This was sent by the SourceForge.net collaborative development platform, the
world's largest Open Source development site.
------------------------------------------------------------------------------
Flow-based real-time traffic analytics software. Cisco certified tool.
Monitor traffic, SLAs, QoS, Medianet, WAAS etc. with NetFlow Analyzer
Customize your own dashboards, set traffic alerts and generate reports.
Network behavioral analysis & security monitoring. All-in-one tool.
http://pubads.g.doubleclick.net/gampad/clk?id=126839071&iu=/4140/ostg.clktrk
_______________________________________________
GATE-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/gate-cvs