Revision: 3344
Author: ferguson.sebastian
Date: Mon Mar  1 08:33:36 2010
Log: Queued the messages in the sender to improve synchronization and fix a bug caused when undo and redo were repeated rapidly.
http://code.google.com/p/power-architect/source/detail?r=3344

Modified:
 /trunk/src/ca/sqlpower/architect/enterprise/ArchitectClientSideSession.java

=======================================
--- /trunk/src/ca/sqlpower/architect/enterprise/ArchitectClientSideSession.java Fri Feb 26 15:05:34 2010 +++ /trunk/src/ca/sqlpower/architect/enterprise/ArchitectClientSideSession.java Mon Mar 1 08:33:36 2010
@@ -12,7 +12,6 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.prefs.Preferences;

 import javax.swing.SwingUtilities;
@@ -546,67 +545,91 @@

// Contained classes --------------------------------------------------------------------

-       private AtomicBoolean persistingToServer = new AtomicBoolean();
+       private boolean persistingToServer = false;

        /**
         * Sends outgoing JSON
         */
        private class Sender extends HttpMessageSender<JSONObject> {

-               private JSONArray message;
+               private List<JSONArray> messageQueue;
+
+               /**
+ * This variable keeps track of the number of calls to flush that were made while + * executing flush. It is used so that each flush call will be executed individually. + * The method flush does not continue flushing until the queue is empty because the + * last message in the queue may be incomplete, and transactions should be sent
+                * atomically.
+                */
+               private int flushDepth = 0;

public Sender(HttpClient httpClient, SPServerInfo serverInfo, String rootUUID) {
                        super(httpClient, serverInfo, rootUUID);
-                       message = new JSONArray();
+                       messageQueue = new ArrayList<JSONArray>();
                }

                public void clear() {
-                       message = new JSONArray();
+                       if (messageQueue.size() > 0) {
+                           messageQueue.remove(0);
+                       }
                }

-               private boolean flushAgain = false;
                public void flush() throws SPPersistenceException {
                    try {
-                       if (persistingToServer.compareAndSet(false, true)) {
-                       flushAgain = false;
-
-                           URI serverURI = getServerURI();
-                    HttpPost postRequest = new HttpPost(serverURI);
-
- postRequest.setEntity(new StringEntity(message.toString())); - postRequest.setHeader("Content-Type", "application/json");
-                    HttpUriRequest request = postRequest;
-                    clear();
-
- final JSONMessage response = getHttpClient().execute(request, new JSONResponseHandler());
-                           runInForeground(new Runnable() {
-                               public void run() {
-                                   try {
-                                       if (response.isSuccessful()) {
- // Message was sent successfully and accepted by the server. - currentRevision = (new JSONObject(response.getBody())).getInt("currentRevision");
-                                       } else {
- // Message was sent successfully but rejected by the server. We must rollback our
-                                           // changes and update to the head 
revision.
-                                           logger.debug("Response 
unsuccessful");
- throw new RuntimeException("Out of sync with server");
-                                       }
-                                       } catch (JSONException e) {
-                                           throw new RuntimeException(e);
-                            } finally {
-                                               persistingToServer.set(false);
+                       synchronized (this) {
+                           if (persistingToServer) {
+                               flushDepth++;
+                               return;
+                           } else {
+                               if (messageQueue.size() > 0) {
+                                   persistingToServer = true;
+                               } else {
+                                   // If flush was called but no message was 
given,
+                                   // don't bother sending anything. (This is 
not a
+                                   // special precaution, it does happen.)
+                                   persistingToServer = false;
+                                   return;
+                               }
+                           }
+                       }
+
+                   URI serverURI = getServerURI();
+                HttpPost postRequest = new HttpPost(serverURI);
+ postRequest.setEntity(new StringEntity(messageQueue.get(0).toString()));
+                postRequest.setHeader("Content-Type", "application/json");
+                HttpUriRequest request = postRequest;
+                clear();
+ final JSONMessage response = getHttpClient().execute(request, new JSONResponseHandler());
+
+                runInForeground(new Runnable() {
+                       public void run() {
+                           try {
+                               if (response.isSuccessful()) {
+ // Message was sent successfully and accepted by the server. + currentRevision = (new JSONObject(response.getBody())).getInt("currentRevision");
+                               } else {
+ // Message was sent successfully but rejected by the server. We must rollback our
+                                   // changes and update to the head revision.
+                                   logger.debug("Response unsuccessful");
+ throw new RuntimeException("Out of sync with server");
+                               }
+                               } catch (JSONException e) {
+                                   throw new RuntimeException(e);
+                        } finally {
+                            synchronized (sender) {
+                                persistingToServer = false;
                                 try {
-                                    if (flushAgain) flush();
+                                    if (flushDepth > 0) {
+                                        flushDepth--;
+                                        flush();
+                                    }
                                 } catch (SPPersistenceException e) {
                                     throw new RuntimeException(e);
                                 }
-                                       }
-                        }
-                    });
-                   } else {
-                       flushAgain = true;
-                return;
-                   }
+                            }
+                               }
+                    }
+                });
                    } catch (UnsupportedEncodingException e) {
                 throw new SPPersistenceException(null, e);
             } catch (URISyntaxException e) {
@@ -617,11 +640,14 @@
                 throw new SPPersistenceException(null, e);
             } catch (RuntimeException e) {
                 throw new SPPersistenceException(null, e);
-            }
+            }
                }

                public void send(JSONObject content) throws 
SPPersistenceException {
-                       message.put(content);
+ if (content.toString().equals("{\"uuid\":null,\"method\":\"begin\"}")) {
+                       messageQueue.add(new JSONArray());
+                   }
+                   messageQueue.get(messageQueue.size() - 1).put(content);
                }

                public URI getServerURI() throws URISyntaxException {
@@ -699,11 +725,13 @@
                         runInForeground(new Runnable() {
                             public void run() {
                                 try {
-                                    if (!persistingToServer.get()) {
- int newRevision = json.getInt("currentRevision"); - if (currentRevision < newRevision) {
-                                            currentRevision = newRevision;
-                                            jsonDecoder.decode(jsonArray);
+                                    synchronized (sender) {
+                                        if (!persistingToServer) {
+ int newRevision = json.getInt("currentRevision"); + if (currentRevision < newRevision) { + currentRevision = newRevision; + jsonDecoder.decode(jsonArray);
+                                            }
                                         }
                                     }
                                 } catch (SPPersistenceException e) {

Reply via email to