Move from Batch to Persistence proc. the responsiblity of adding sendReplies to 
Reply proc.

Change-Id: I754f7189a166420652fcfec4fa4c1497212f8d7c


Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/aa2651a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/aa2651a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/aa2651a0

Branch: refs/heads/master
Commit: aa2651a0a866e60e49504d6b9b6d4b47062a1c5a
Parents: 1d60f21
Author: Francisco Perez-Sorrosal <fpe...@yahoo-inc.com>
Authored: Wed May 4 10:04:28 2016 -0700
Committer: Francisco Perez-Sorrosal <fpe...@yahoo-inc.com>
Committed: Wed May 4 10:04:28 2016 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/omid/tso/Batch.java    | 38 ++++++-------------
 .../omid/tso/PersistenceProcessorHandler.java   | 40 ++++++++++++++++----
 .../java/org/apache/omid/tso/TestBatch.java     |  8 ++--
 3 files changed, 48 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/aa2651a0/tso-server/src/main/java/org/apache/omid/tso/Batch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/Batch.java 
b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
index 2b17f23..b3b9eef 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/Batch.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
@@ -91,6 +91,18 @@ public class Batch {
 
     }
 
+    PersistEvent get(int idx) {
+        return events[idx];
+    }
+
+    void set(int idx, PersistEvent event) {
+        events[idx] = event;
+    }
+
+    void decreaseNumEvents() {
+        numEvents--;
+    }
+
     void addCommit(long startTimestamp, long commitTimestamp, Channel c, 
MonitoringContext context) {
         Preconditions.checkState(!isFull(), "batch is full");
         int index = numEvents++;
@@ -123,32 +135,6 @@ public class Batch {
 
     }
 
-    void sendReply(ReplyProcessor reply, RetryProcessor retryProc, long 
batchID) {
-
-        int i = 0;
-        while (i < numEvents) {
-            PersistEvent e = events[i];
-            if (e.getType() == Type.ABORT && e.isRetry()) {
-                
retryProc.disambiguateRetryRequestHeuristically(e.getStartTimestamp(), 
e.getChannel(), e.getMonCtx());
-                PersistEvent tmp = events[i];
-                //TODO: why assign it?
-                events[i] = events[numEvents - 1];
-                events[numEvents - 1] = tmp;
-                if (numEvents == 1) {
-                    clear();
-                    reply.manageResponsesBatch(batchID, null);
-                    return;
-                }
-                numEvents--;
-                continue;
-            }
-            i++;
-        }
-
-        reply.manageResponsesBatch(batchID, this);
-
-    }
-
     @Override
     public String toString() {
         return Objects.toStringHelper(this)

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/aa2651a0/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
----------------------------------------------------------------------
diff --git 
a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java 
b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
index 84890b9..099cf88 100644
--- 
a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
+++ 
b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
@@ -17,7 +17,6 @@
  */
 package org.apache.omid.tso;
 
-import com.lmax.disruptor.LifecycleAware;
 import com.lmax.disruptor.WorkHandler;
 import org.apache.omid.committable.CommitTable;
 import org.apache.omid.metrics.Histogram;
@@ -94,25 +93,24 @@ public class PersistenceProcessorHandler implements 
WorkHandler<PersistenceProce
                 throw new RuntimeException("Unknown event type: " + 
localEvent.getType().name());
             }
         }
-        flush(batch, event.getBatchSequence());
-
+        if (batch.getNumEvents() > 0) {
+            flush(batch.getNumEvents());
+            sendReplies(batch, event.getBatchSequence());
+        }
     }
 
-    private void flush(Batch batch, long batchSequence) {
+    private void flush(int numBatchedEvents) {
 
-        if (batch.getNumEvents() > 0) {
             commitSuicideIfNotMaster();
             try {
                 long startFlushTimeInNs = System.nanoTime();
                 writer.flush();
                 flushTimer.update(System.nanoTime() - startFlushTimeInNs);
-                batchSizeHistogram.update(batch.getNumEvents());
+                batchSizeHistogram.update(numBatchedEvents);
             } catch (IOException e) {
                 panicker.panic("Error persisting commit batch", e);
             }
             commitSuicideIfNotMaster(); // TODO Here, we can return the client 
responses before committing suicide
-            batch.sendReply(replyProcessor, retryProc, batchSequence);
-        }
 
     }
 
@@ -122,4 +120,30 @@ public class PersistenceProcessorHandler implements 
WorkHandler<PersistenceProce
         }
     }
 
+    private void sendReplies(Batch batch, long batchSequence) {
+
+        int i = 0;
+        while (i < batch.getNumEvents()) {
+            PersistEvent e = batch.get(i);
+            if (e.getType() == PersistEvent.Type.ABORT && e.isRetry()) {
+                
retryProc.disambiguateRetryRequestHeuristically(e.getStartTimestamp(), 
e.getChannel(), e.getMonCtx());
+                PersistEvent tmp = batch.get(i);
+                //TODO: why assign it?
+                batch.set(i, batch.get(batch.getNumEvents() - 1));
+                batch.set(batch.getNumEvents()  - 1, tmp);
+                if (batch.getNumEvents()  == 1) {
+                    batch.clear();
+                    replyProcessor.manageResponsesBatch(batchSequence, null);
+                    return;
+                }
+                batch.decreaseNumEvents();
+                continue;
+            }
+            i++;
+        }
+
+        replyProcessor.manageResponsesBatch(batchSequence, batch);
+
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/aa2651a0/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java 
b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
index c003f34..2b8b318 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
@@ -110,8 +110,8 @@ public class TestBatch {
 //        assertFalse(batch.isFull(), "Batch shouldn't be full");
 //        assertEquals(batch.getNumEvents(), 0, "Num events should be 0");
 //=======
-        batch.sendReply(replyProcessor, retryProcessor, (-1));
-        verify(replyProcessor, 
timeout(100).times(1)).manageResponsesBatch((-1), batch);
+//        batch.sendReply(replyProcessor, retryProcessor, (-1));
+        //verify(replyProcessor, 
timeout(100).times(1)).manageResponsesBatch((-1), batch);
         assertTrue(batch.isFull(), "Batch shouldn't be empty");
     }
 
@@ -135,8 +135,8 @@ public class TestBatch {
 
         // Test that sending replies empties the batch also when the replica  
is NOT master and calls the
         // ambiguousCommitResponse() method on the reply processor
-        batch.sendReply(replyProcessor, retryProcessor, (-1));
-        verify(replyProcessor, 
timeout(100).times(1)).manageResponsesBatch((-1), batch);
+        //batch.sendReply(replyProcessor, retryProcessor, (-1));
+        //verify(replyProcessor, 
timeout(100).times(1)).manageResponsesBatch((-1), batch);
         assertTrue(batch.isFull(), "Batch should be full");
     }
 

Reply via email to