Repository: incubator-omid
Updated Branches:
  refs/heads/0.9.0.0 57968cc1d -> 558c640b8


[OMID-89] Fix metrics in Persistence processsor handlers

Change-Id: Ieb6cdf5d47cb113ea6cc8e9799bfbdef20b66565


Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/558c640b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/558c640b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/558c640b

Branch: refs/heads/0.9.0.0
Commit: 558c640b849e5abc5311b4d35796756b4b05e7a6
Parents: 57968cc
Author: Francisco Perez-Sorrosal <fperezsorro...@apache.org>
Authored: Fri Feb 2 15:09:02 2018 -0800
Committer: Francisco Perez-Sorrosal <fperezsorro...@apache.org>
Committed: Wed Feb 14 10:42:35 2018 -0800

----------------------------------------------------------------------
 .../omid/tso/PersistenceProcessorHandler.java   | 34 +++++++++++++++++---
 .../tso/TestPersistenceProcessorHandler.java    | 30 +++++++++++++++++
 2 files changed, 59 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/558c640b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
----------------------------------------------------------------------
diff --git 
a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java 
b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
index 07241f0..8a93fc4 100644
--- 
a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
+++ 
b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
@@ -17,6 +17,8 @@
  */
 package org.apache.omid.tso;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
 import com.lmax.disruptor.WorkHandler;
 import org.apache.omid.committable.CommitTable;
 import org.apache.omid.metrics.Histogram;
@@ -28,6 +30,7 @@ import org.slf4j.LoggerFactory;
 import javax.inject.Inject;
 import java.io.IOException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static com.codahale.metrics.MetricRegistry.name;
 import static org.apache.omid.tso.PersistEvent.Type.*;
@@ -36,6 +39,11 @@ public class PersistenceProcessorHandler implements 
WorkHandler<PersistenceProce
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PersistenceProcessorHandler.class);
 
+    @VisibleForTesting
+    static final AtomicInteger consecutiveSequenceCreator = new 
AtomicInteger(0);
+
+    private final String id;
+
     private final String tsoHostAndPort;
     private final LeaseManagement leaseManager;
 
@@ -44,13 +52,14 @@ public class PersistenceProcessorHandler implements 
WorkHandler<PersistenceProce
     private final CommitTable.Writer writer;
     final Panicker panicker;
 
+    // Metrics in this component
     private final Timer flushTimer;
     private final Histogram batchSizeHistogram;
     private final Histogram flushedCommitEventsHistogram;
 
     @Inject
     PersistenceProcessorHandler(MetricsRegistry metrics,
-                                String tsoHostAndPort,
+                                String tsoHostAndPort, // TODO This should not 
be passed here. Should be part of panicker
                                 LeaseManagement leaseManager,
                                 CommitTable commitTable,
                                 ReplyProcessor replyProcessor,
@@ -58,6 +67,7 @@ public class PersistenceProcessorHandler implements 
WorkHandler<PersistenceProce
                                 Panicker panicker)
     throws InterruptedException, ExecutionException, IOException {
 
+        this.id = String.valueOf(consecutiveSequenceCreator.getAndIncrement());
         this.tsoHostAndPort = tsoHostAndPort;
         this.leaseManager = leaseManager;
         this.writer = commitTable.getWriter();
@@ -65,13 +75,20 @@ public class PersistenceProcessorHandler implements 
WorkHandler<PersistenceProce
         this.retryProcessor = retryProcessor;
         this.panicker = panicker;
 
-        // Metrics in this component
-        flushTimer = metrics.timer(name("tso", "persist", "flush", "latency"));
-        flushedCommitEventsHistogram = metrics.histogram(name("tso", 
"persist", "flushed", "commits", "size"));
-        batchSizeHistogram = metrics.histogram(name("tso", "persist", "batch", 
"size"));
+        // Metrics setup
+        String flushTimerName = name("tso", "persistence-processor-handler", 
id, "flush", "latency");
+        flushTimer = metrics.timer(flushTimerName);
+        String flushedCommitEventsName = name("tso", 
"persistence-processor-handler", id, "flushed", "commits", "size");
+        flushedCommitEventsHistogram = 
metrics.histogram(flushedCommitEventsName);
+        String batchSizeMetricsName = name("tso", 
"persistence-processor-handler", id, "batch", "size");
+        batchSizeHistogram = metrics.histogram(batchSizeMetricsName);
 
     }
 
+    public String getId() {
+        return id;
+    }
+
     @Override
     public void onEvent(PersistenceProcessorImpl.PersistBatchEvent batchEvent) 
throws Exception {
 
@@ -179,4 +196,11 @@ public class PersistenceProcessorHandler implements 
WorkHandler<PersistenceProce
         batch.set(lastIdx, tmpEvent);
     }
 
+    @Override
+    public String toString() {
+
+        return Objects.toStringHelper(this).add("id", id).toString();
+
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/558c640b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
----------------------------------------------------------------------
diff --git 
a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
 
b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
index d60d019..43f354f 100644
--- 
a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
+++ 
b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
@@ -126,6 +126,36 @@ public class TestPersistenceProcessorHandler {
         Mockito.reset(mockWriter);
     }
 
+    @Test(timeOut = 1_000)
+    public void testPersistentProcessorHandlerIdsAreCreatedConsecutive() 
throws Exception {
+
+        TSOServerConfig tsoConfig = new TSOServerConfig();
+        tsoConfig.setNumConcurrentCTWriters(32);
+
+        PersistenceProcessorHandler[] handlers = new 
PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
+        for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
+            handlers[i] = new PersistenceProcessorHandler(metrics,
+                                                          "localhost:1234",
+                                                          
mock(LeaseManager.class),
+                                                          commitTable,
+                                                          
mock(ReplyProcessor.class),
+                                                          retryProcessor,
+                                                          panicker);
+        }
+
+        for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
+            // Required to generalize the cases when other tests have 
increased the static variable assigning the ids
+            if (i + 1 < tsoConfig.getNumConcurrentCTWriters()) {
+                int followingHandlerIdAsInt = Integer.valueOf(handlers[i + 
1].getId());
+                assertEquals(handlers[i].getId(), 
String.valueOf(followingHandlerIdAsInt - 1));
+            } else { // Final case: compare with the last element that the 
sequence creator assigned
+                int followingHandlerIdAsInt = 
PersistenceProcessorHandler.consecutiveSequenceCreator.get();
+                assertEquals(handlers[i].getId(), 
String.valueOf(followingHandlerIdAsInt - 1));
+            }
+        }
+
+    }
+
     @Test(timeOut = 10_000)
     public void testProcessingOfEmptyBatchPersistEvent() throws Exception {
 

Reply via email to