Revision: 3344
Author: ferguson.sebastian
Date: Mon Mar 1 08:33:36 2010
Log: Queued the messages in the sender to improve synchronization and fix a
bug caused when undo and redo were repeated rapidly.
http://code.google.com/p/power-architect/source/detail?r=3344
Modified:
/trunk/src/ca/sqlpower/architect/enterprise/ArchitectClientSideSession.java
=======================================
---
/trunk/src/ca/sqlpower/architect/enterprise/ArchitectClientSideSession.java
Fri Feb 26 15:05:34 2010
+++
/trunk/src/ca/sqlpower/architect/enterprise/ArchitectClientSideSession.java
Mon Mar 1 08:33:36 2010
@@ -12,7 +12,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.prefs.Preferences;
import javax.swing.SwingUtilities;
@@ -546,67 +545,91 @@
// Contained classes
--------------------------------------------------------------------
- private AtomicBoolean persistingToServer = new AtomicBoolean();
+ private boolean persistingToServer = false;
/**
* Sends outgoing JSON
*/
private class Sender extends HttpMessageSender<JSONObject> {
- private JSONArray message;
+ private List<JSONArray> messageQueue;
+
+ /**
+ * This variable keeps track of the number of calls to flush that were
made while
+ * executing flush. It is used so that each flush call will be executed
individually.
+ * The method flush does not continue flushing until the queue is empty
because the
+ * last message in the queue may be incomplete, and transactions should
be sent
+ * atomically.
+ */
+ private int flushDepth = 0;
public Sender(HttpClient httpClient, SPServerInfo serverInfo, String
rootUUID) {
super(httpClient, serverInfo, rootUUID);
- message = new JSONArray();
+ messageQueue = new ArrayList<JSONArray>();
}
public void clear() {
- message = new JSONArray();
+ if (messageQueue.size() > 0) {
+ messageQueue.remove(0);
+ }
}
- private boolean flushAgain = false;
public void flush() throws SPPersistenceException {
try {
- if (persistingToServer.compareAndSet(false, true)) {
- flushAgain = false;
-
- URI serverURI = getServerURI();
- HttpPost postRequest = new HttpPost(serverURI);
-
- postRequest.setEntity(new
StringEntity(message.toString()));
-
postRequest.setHeader("Content-Type", "application/json");
- HttpUriRequest request = postRequest;
- clear();
-
- final JSONMessage response =
getHttpClient().execute(request, new JSONResponseHandler());
- runInForeground(new Runnable() {
- public void run() {
- try {
- if (response.isSuccessful()) {
- // Message was sent successfully and
accepted by the server.
- currentRevision = (new
JSONObject(response.getBody())).getInt("currentRevision");
- } else {
- // Message was sent successfully but
rejected by the server. We must rollback our
- // changes and update to the head
revision.
- logger.debug("Response
unsuccessful");
- throw new RuntimeException("Out of sync with
server");
- }
- } catch (JSONException e) {
- throw new RuntimeException(e);
- } finally {
- persistingToServer.set(false);
+ synchronized (this) {
+ if (persistingToServer) {
+ flushDepth++;
+ return;
+ } else {
+ if (messageQueue.size() > 0) {
+ persistingToServer = true;
+ } else {
+ // If flush was called but no message was
given,
+ // don't bother sending anything. (This is
not a
+ // special precaution, it does happen.)
+ persistingToServer = false;
+ return;
+ }
+ }
+ }
+
+ URI serverURI = getServerURI();
+ HttpPost postRequest = new HttpPost(serverURI);
+ postRequest.setEntity(new
StringEntity(messageQueue.get(0).toString()));
+ postRequest.setHeader("Content-Type", "application/json");
+ HttpUriRequest request = postRequest;
+ clear();
+ final JSONMessage response =
getHttpClient().execute(request, new JSONResponseHandler());
+
+ runInForeground(new Runnable() {
+ public void run() {
+ try {
+ if (response.isSuccessful()) {
+ // Message was sent successfully and accepted by
the server.
+ currentRevision = (new
JSONObject(response.getBody())).getInt("currentRevision");
+ } else {
+ // Message was sent successfully but rejected by
the server. We must rollback our
+ // changes and update to the head revision.
+ logger.debug("Response unsuccessful");
+ throw new RuntimeException("Out of sync with
server");
+ }
+ } catch (JSONException e) {
+ throw new RuntimeException(e);
+ } finally {
+ synchronized (sender) {
+ persistingToServer = false;
try {
- if (flushAgain) flush();
+ if (flushDepth > 0) {
+ flushDepth--;
+ flush();
+ }
} catch (SPPersistenceException e) {
throw new RuntimeException(e);
}
- }
- }
- });
- } else {
- flushAgain = true;
- return;
- }
+ }
+ }
+ }
+ });
} catch (UnsupportedEncodingException e) {
throw new SPPersistenceException(null, e);
} catch (URISyntaxException e) {
@@ -617,11 +640,14 @@
throw new SPPersistenceException(null, e);
} catch (RuntimeException e) {
throw new SPPersistenceException(null, e);
- }
+ }
}
public void send(JSONObject content) throws
SPPersistenceException {
- message.put(content);
+ if
(content.toString().equals("{\"uuid\":null,\"method\":\"begin\"}")) {
+ messageQueue.add(new JSONArray());
+ }
+ messageQueue.get(messageQueue.size() - 1).put(content);
}
public URI getServerURI() throws URISyntaxException {
@@ -699,11 +725,13 @@
runInForeground(new Runnable() {
public void run() {
try {
- if (!persistingToServer.get()) {
- int newRevision =
json.getInt("currentRevision");
- if (currentRevision < newRevision)
{
- currentRevision = newRevision;
- jsonDecoder.decode(jsonArray);
+ synchronized (sender) {
+ if (!persistingToServer) {
+ int newRevision =
json.getInt("currentRevision");
+ if (currentRevision <
newRevision) {
+ currentRevision =
newRevision;
+
jsonDecoder.decode(jsonArray);
+ }
}
}
} catch (SPPersistenceException e) {