Repository: samza
Updated Branches:
  refs/heads/master bdce47707 -> 210631cd5


SAMZA-2015: Refactor timer handling in tables to be consistent with stores

Currently when timer is disabled, we do not instantiate timer instances for 
tables, this introduced potential opportunities for NPE in the future. We 
wanted to refactor to use the same approach used in store implementation based 
on HighResolutionClock.

Author: Wei Song <[email protected]>

Reviewers: Xinyu Liu <[email protected]>

Closes #835 from weisong44/SAMZA-2015


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/210631cd
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/210631cd
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/210631cd

Branch: refs/heads/master
Commit: 210631cd5c4487e99978189ee9b8d0b0c8847aba
Parents: bdce477
Author: Wei Song <[email protected]>
Authored: Fri Nov 30 12:52:59 2018 -0800
Committer: Wei Song <[email protected]>
Committed: Fri Nov 30 12:52:59 2018 -0800

----------------------------------------------------------------------
 .../apache/samza/table/BaseReadableTable.java   |  9 +++++
 .../samza/table/caching/CachingTable.java       | 28 +++++++--------
 .../table/remote/RemoteReadWriteTable.java      |  8 ++---
 .../samza/table/remote/RemoteReadableTable.java |  8 ++---
 .../samza/table/utils/TableReadMetrics.java     | 15 ++------
 .../samza/table/utils/TableWriteMetrics.java    | 37 ++++++--------------
 .../samza/table/caching/TestCachingTable.java   | 14 +++-----
 .../samza/storage/kv/LocalReadWriteTable.java   |  4 +--
 .../samza/storage/kv/LocalReadableTable.java    |  4 +--
 .../storage/kv/TestLocalReadWriteTable.java     |  3 --
 .../storage/kv/TestLocalReadableTable.java      |  3 --
 11 files changed, 52 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java 
b/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java
index 7eaaa83..1dfd54c 100644
--- a/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java
@@ -19,9 +19,11 @@
 package org.apache.samza.table;
 
 import com.google.common.base.Preconditions;
+import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.context.Context;
 import org.apache.samza.table.utils.TableReadMetrics;
 import org.apache.samza.table.utils.TableWriteMetrics;
+import org.apache.samza.util.HighResolutionClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,6 +43,8 @@ abstract public class BaseReadableTable<K, V> implements 
ReadableTable<K, V> {
   protected TableReadMetrics readMetrics;
   protected TableWriteMetrics writeMetrics;
 
+  protected HighResolutionClock clock;
+
   /**
    * Construct an instance
    * @param tableId Id of the table
@@ -54,6 +58,11 @@ abstract public class BaseReadableTable<K, V> implements 
ReadableTable<K, V> {
 
   @Override
   public void init(Context context) {
+    MetricsConfig metricsConfig = new 
MetricsConfig(context.getJobContext().getConfig());
+    clock = metricsConfig.getMetricsTimerEnabled()
+        ? () -> System.nanoTime()
+        : () -> 0L;
+
     readMetrics = new TableReadMetrics(context, this, tableId);
     if (this instanceof ReadWriteTable) {
       writeMetrics = new TableWriteMetrics(context, this, tableId);

http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java 
b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
index ac6188b..e63bf61 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
@@ -130,7 +130,7 @@ public class CachingTable<K, V> extends 
BaseReadableTable<K, V>
       return CompletableFuture.completedFuture(value);
     }
 
-    long startNs = System.nanoTime();
+    long startNs = clock.nanoTime();
     missCount.incrementAndGet();
 
     return rdTable.getAsync(key).handle((result, e) -> {
@@ -140,7 +140,7 @@ public class CachingTable<K, V> extends 
BaseReadableTable<K, V>
           if (result != null) {
             cache.put(key, result);
           }
-          updateTimer(readMetrics.getNs, System.nanoTime() - startNs);
+          updateTimer(readMetrics.getNs, clock.nanoTime() - startNs);
           return result;
         }
       });
@@ -168,7 +168,7 @@ public class CachingTable<K, V> extends 
BaseReadableTable<K, V>
       return CompletableFuture.completedFuture(getAllResult);
     }
 
-    long startNs = System.nanoTime();
+    long startNs = clock.nanoTime();
     return rdTable.getAllAsync(missingKeys).handle((records, e) -> {
         if (e != null) {
           throw new SamzaException("Failed to get records for " + keys, e);
@@ -179,7 +179,7 @@ public class CachingTable<K, V> extends 
BaseReadableTable<K, V>
                 .collect(Collectors.toList()));
             getAllResult.putAll(records);
           }
-          updateTimer(readMetrics.getAllNs, System.nanoTime() - startNs);
+          updateTimer(readMetrics.getAllNs, clock.nanoTime() - startNs);
           return getAllResult;
         }
       });
@@ -201,7 +201,7 @@ public class CachingTable<K, V> extends 
BaseReadableTable<K, V>
     incCounter(writeMetrics.numPuts);
     Preconditions.checkNotNull(rwTable, "Cannot write to a read-only table: " 
+ rdTable);
 
-    long startNs = System.nanoTime();
+    long startNs = clock.nanoTime();
     return rwTable.putAsync(key, value).handle((result, e) -> {
         if (e != null) {
           throw new SamzaException(String.format("Failed to put a record, 
key=%s, value=%s", key, value), e);
@@ -212,7 +212,7 @@ public class CachingTable<K, V> extends 
BaseReadableTable<K, V>
             cache.put(key, value);
           }
         }
-        updateTimer(writeMetrics.putNs, System.nanoTime() - startNs);
+        updateTimer(writeMetrics.putNs, clock.nanoTime() - startNs);
         return result;
       });
   }
@@ -231,7 +231,7 @@ public class CachingTable<K, V> extends 
BaseReadableTable<K, V>
   @Override
   public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> records) {
     incCounter(writeMetrics.numPutAlls);
-    long startNs = System.nanoTime();
+    long startNs = clock.nanoTime();
     Preconditions.checkNotNull(rwTable, "Cannot write to a read-only table: " 
+ rdTable);
     return rwTable.putAllAsync(records).handle((result, e) -> {
         if (e != null) {
@@ -240,7 +240,7 @@ public class CachingTable<K, V> extends 
BaseReadableTable<K, V>
           cache.putAll(records);
         }
 
-        updateTimer(writeMetrics.putAllNs, System.nanoTime() - startNs);
+        updateTimer(writeMetrics.putAllNs, clock.nanoTime() - startNs);
         return result;
       });
   }
@@ -259,7 +259,7 @@ public class CachingTable<K, V> extends 
BaseReadableTable<K, V>
   @Override
   public CompletableFuture<Void> deleteAsync(K key) {
     incCounter(writeMetrics.numDeletes);
-    long startNs = System.nanoTime();
+    long startNs = clock.nanoTime();
     Preconditions.checkNotNull(rwTable, "Cannot delete from a read-only table: 
" + rdTable);
     return rwTable.deleteAsync(key).handle((result, e) -> {
         if (e != null) {
@@ -267,7 +267,7 @@ public class CachingTable<K, V> extends 
BaseReadableTable<K, V>
         } else if (!isWriteAround) {
           cache.delete(key);
         }
-        updateTimer(writeMetrics.deleteNs, System.nanoTime() - startNs);
+        updateTimer(writeMetrics.deleteNs, clock.nanoTime() - startNs);
         return result;
       });
   }
@@ -284,7 +284,7 @@ public class CachingTable<K, V> extends 
BaseReadableTable<K, V>
   @Override
   public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
     incCounter(writeMetrics.numDeleteAlls);
-    long startNs = System.nanoTime();
+    long startNs = clock.nanoTime();
     Preconditions.checkNotNull(rwTable, "Cannot delete from a read-only table: 
" + rdTable);
     return rwTable.deleteAllAsync(keys).handle((result, e) -> {
         if (e != null) {
@@ -292,7 +292,7 @@ public class CachingTable<K, V> extends 
BaseReadableTable<K, V>
         } else if (!isWriteAround) {
           cache.deleteAll(keys);
         }
-        updateTimer(writeMetrics.deleteAllNs, System.nanoTime() - startNs);
+        updateTimer(writeMetrics.deleteAllNs, clock.nanoTime() - startNs);
         return result;
       });
   }
@@ -300,10 +300,10 @@ public class CachingTable<K, V> extends 
BaseReadableTable<K, V>
   @Override
   public synchronized void flush() {
     incCounter(writeMetrics.numFlushes);
-    long startNs = System.nanoTime();
+    long startNs = clock.nanoTime();
     Preconditions.checkNotNull(rwTable, "Cannot flush a read-only table: " + 
rdTable);
     rwTable.flush();
-    updateTimer(writeMetrics.flushNs, System.nanoTime() - startNs);
+    updateTimer(writeMetrics.flushNs, clock.nanoTime() - startNs);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
 
b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
index b96087b..80c2cac 100644
--- 
a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
+++ 
b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
@@ -174,9 +174,9 @@ public class RemoteReadWriteTable<K, V> extends 
RemoteReadableTable<K, V>
   public void flush() {
     try {
       incCounter(writeMetrics.numFlushes);
-      long startNs = System.nanoTime();
+      long startNs = clock.nanoTime();
       writeFn.flush();
-      updateTimer(writeMetrics.flushNs, System.nanoTime() - startNs);
+      updateTimer(writeMetrics.flushNs, clock.nanoTime() - startNs);
     } catch (Exception e) {
       String errMsg = "Failed to flush remote store";
       logger.error(errMsg, e);
@@ -202,7 +202,7 @@ public class RemoteReadWriteTable<K, V> extends 
RemoteReadableTable<K, V>
   protected CompletableFuture<Void> execute(TableRateLimiter<K, V> rateLimiter,
       K key, V value, BiFunction<K, V, CompletableFuture<Void>> method, 
Counter counter, Timer timer) {
     incCounter(counter);
-    final long startNs = System.nanoTime();
+    final long startNs = clock.nanoTime();
     CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited()
         ? CompletableFuture
             .runAsync(() -> rateLimiter.throttle(key, value), tableExecutor)
@@ -223,7 +223,7 @@ public class RemoteReadWriteTable<K, V> extends 
RemoteReadableTable<K, V>
       Collection<Entry<K, V>> records, Function<Collection<Entry<K, V>>, 
CompletableFuture<Void>> method,
       Counter counter, Timer timer) {
     incCounter(counter);
-    final long startNs = System.nanoTime();
+    final long startNs = clock.nanoTime();
     CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited()
         ? CompletableFuture
             .runAsync(() -> rateLimiter.throttleRecords(records), 
tableExecutor)

http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
 
b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
index e02650e..84a05b8 100644
--- 
a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
+++ 
b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
@@ -166,7 +166,7 @@ public class RemoteReadableTable<K, V> extends 
BaseReadableTable<K, V> {
   protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> 
rateLimiter,
       K key, Function<K, CompletableFuture<T>> method, Counter counter, Timer 
timer) {
     incCounter(counter);
-    final long startNs = System.nanoTime();
+    final long startNs = clock.nanoTime();
     CompletableFuture<T> ioFuture = rateLimiter.isRateLimited()
         ? CompletableFuture
             .runAsync(() -> rateLimiter.throttle(key), tableExecutor)
@@ -187,7 +187,7 @@ public class RemoteReadableTable<K, V> extends 
BaseReadableTable<K, V> {
   protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> 
rateLimiter,
       Collection<K> keys, Function<Collection<K>, CompletableFuture<T>> 
method, Counter counter, Timer timer) {
     incCounter(counter);
-    final long startNs = System.nanoTime();
+    final long startNs = clock.nanoTime();
     CompletableFuture<T> ioFuture = rateLimiter.isRateLimited()
         ? CompletableFuture
             .runAsync(() -> rateLimiter.throttle(keys), tableExecutor)
@@ -207,12 +207,12 @@ public class RemoteReadableTable<K, V> extends 
BaseReadableTable<K, V> {
   protected  <T> CompletableFuture<T> completeExecution(CompletableFuture<T> 
ioFuture, long startNs, Timer timer) {
     if (callbackExecutor != null) {
       ioFuture.thenApplyAsync(r -> {
-          updateTimer(timer, System.nanoTime() - startNs);
+          updateTimer(timer, clock.nanoTime() - startNs);
           return r;
         }, callbackExecutor);
     } else {
       ioFuture.thenApply(r -> {
-          updateTimer(timer, System.nanoTime() - startNs);
+          updateTimer(timer, clock.nanoTime() - startNs);
           return r;
         });
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java 
b/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java
index 0775844..e77fcfd 100644
--- 
a/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java
+++ 
b/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java
@@ -18,7 +18,6 @@
  */
 package org.apache.samza.table.utils;
 
-import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.context.Context;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Timer;
@@ -34,7 +33,6 @@ public class TableReadMetrics {
   public final Timer getAllNs;
   public final Counter numGets;
   public final Counter numGetAlls;
-  public final Timer getCallbackNs;
   public final Counter numMissedLookups;
 
   /**
@@ -47,19 +45,10 @@ public class TableReadMetrics {
   public TableReadMetrics(Context context, Table table, String tableId) {
     TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, 
tableId);
     numGets = tableMetricsUtil.newCounter("num-gets");
+    getNs = tableMetricsUtil.newTimer("get-ns");
     numGetAlls = tableMetricsUtil.newCounter("num-getAlls");
+    getAllNs = tableMetricsUtil.newTimer("getAll-ns");
     numMissedLookups = tableMetricsUtil.newCounter("num-missed-lookups");
-
-    MetricsConfig metricsConfig = new 
MetricsConfig(context.getJobContext().getConfig());
-    if (metricsConfig.getMetricsTimerEnabled()) {
-      getNs = tableMetricsUtil.newTimer("get-ns");
-      getAllNs = tableMetricsUtil.newTimer("getAll-ns");
-      getCallbackNs = tableMetricsUtil.newTimer("get-callback-ns");
-    } else {
-      getNs = null;
-      getAllNs = null;
-      getCallbackNs = null;
-    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java 
b/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java
index 02af35f..bf65b74 100644
--- 
a/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java
+++ 
b/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java
@@ -18,7 +18,6 @@
  */
 package org.apache.samza.table.utils;
 
-import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.context.Context;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Timer;
@@ -27,18 +26,16 @@ import org.apache.samza.table.Table;
 
 public class TableWriteMetrics {
 
-  public final Timer putNs;
-  public final Timer putAllNs;
-  public final Timer deleteNs;
-  public final Timer deleteAllNs;
-  public final Timer flushNs;
   public final Counter numPuts;
+  public final Timer putNs;
   public final Counter numPutAlls;
+  public final Timer putAllNs;
   public final Counter numDeletes;
+  public final Timer deleteNs;
   public final Counter numDeleteAlls;
+  public final Timer deleteAllNs;
   public final Counter numFlushes;
-  public final Timer putCallbackNs;
-  public final Timer deleteCallbackNs;
+  public final Timer flushNs;
 
   /**
    * Utility class that contains the default set of write metrics.
@@ -50,28 +47,14 @@ public class TableWriteMetrics {
   public TableWriteMetrics(Context context, Table table, String tableId) {
     TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, 
tableId);
     numPuts = tableMetricsUtil.newCounter("num-puts");
+    putNs = tableMetricsUtil.newTimer("put-ns");
     numPutAlls = tableMetricsUtil.newCounter("num-putAlls");
+    putAllNs = tableMetricsUtil.newTimer("putAll-ns");
     numDeletes = tableMetricsUtil.newCounter("num-deletes");
+    deleteNs = tableMetricsUtil.newTimer("delete-ns");
     numDeleteAlls = tableMetricsUtil.newCounter("num-deleteAlls");
+    deleteAllNs = tableMetricsUtil.newTimer("deleteAll-ns");
     numFlushes = tableMetricsUtil.newCounter("num-flushes");
-
-    MetricsConfig metricsConfig = new 
MetricsConfig(context.getJobContext().getConfig());
-    if (metricsConfig.getMetricsTimerEnabled()) {
-      putNs = tableMetricsUtil.newTimer("put-ns");
-      putAllNs = tableMetricsUtil.newTimer("putAll-ns");
-      deleteNs = tableMetricsUtil.newTimer("delete-ns");
-      deleteAllNs = tableMetricsUtil.newTimer("deleteAll-ns");
-      flushNs = tableMetricsUtil.newTimer("flush-ns");
-      putCallbackNs = tableMetricsUtil.newTimer("put-callback-ns");
-      deleteCallbackNs = tableMetricsUtil.newTimer("delete-callback-ns");
-    } else {
-      putNs = null;
-      putAllNs = null;
-      deleteNs = null;
-      deleteAllNs = null;
-      flushNs = null;
-      putCallbackNs = null;
-      deleteCallbackNs = null;
-    }
+    flushNs = tableMetricsUtil.newTimer("flush-ns");
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java 
b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
index daaba46..5a19767 100644
--- 
a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
+++ 
b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
@@ -277,14 +277,14 @@ public class TestCachingTable {
   public void testGuavaCacheAndRemoteTable() throws Exception {
     String tableId = "testGuavaCacheAndRemoteTable";
     Cache<String, String> guavaCache = 
CacheBuilder.newBuilder().initialCapacity(100).build();
-    final ReadWriteTable<String, String> guavaTable = new 
GuavaCacheTable<>(tableId, guavaCache);
+    final ReadWriteTable<String, String> guavaTable = new 
GuavaCacheTable<>(tableId + "-cache", guavaCache);
 
     // It is okay to share rateLimitHelper and async helper for read/write in 
test
     TableRateLimiter<String, String> rateLimitHelper = 
mock(TableRateLimiter.class);
     TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
     TableWriteFunction<String, String> writeFn = 
mock(TableWriteFunction.class);
     final RemoteReadWriteTable<String, String> remoteTable = new 
RemoteReadWriteTable<>(
-        tableId, readFn, writeFn, rateLimitHelper, rateLimitHelper,
+        tableId + "-remote", readFn, writeFn, rateLimitHelper, rateLimitHelper,
         Executors.newSingleThreadExecutor(), 
Executors.newSingleThreadExecutor());
 
     final CachingTable<String, String> cachingTable = new CachingTable<>(
@@ -296,11 +296,11 @@ public class TestCachingTable {
     // 5 per read/write table (15)
     verify(metricsRegistry, times(24)).newCounter(any(), anyString());
 
-    // 3 per readable table (9)
-    // 7 per read/write table (21)
+    // 2 per readable table (6)
+    // 5 per read/write table (15)
     // 1 per remote readable table (1)
     // 1 per remote read/write table (1)
-    verify(metricsRegistry, times(32)).newTimer(any(), anyString());
+    verify(metricsRegistry, times(23)).newTimer(any(), anyString());
 
     // 1 per guava table (1)
     // 3 per caching table (2)
@@ -424,10 +424,6 @@ public class TestCachingTable {
     cachingTable.deleteAsync("");
     cachingTable.deleteAll(Collections.emptyList());
     cachingTable.deleteAllAsync(Collections.emptyList());
-
-    verify(metricsRegistry, atLeast(1)).newCounter(any(), anyString());
-    verify(metricsRegistry, atLeast(1)).newGauge(anyString(), any());
-    verify(metricsRegistry, times(0)).newTimer(any(), anyString());
   }
 
   private TableDescriptor createDummyTableDescriptor(String tableId) {

http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java 
b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
index 2704429..eae6bb0 100644
--- 
a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
+++ 
b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
@@ -146,9 +146,9 @@ public class LocalReadWriteTable<K, V> extends 
LocalReadableTable<K, V>
 
   private void instrument(Counter counter, Timer timer, Func0 func) {
     incCounter(counter);
-    long startNs = System.nanoTime();
+    long startNs = clock.nanoTime();
     func.apply();
-    updateTimer(timer, System.nanoTime() - startNs);
+    updateTimer(timer, clock.nanoTime() - startNs);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java 
b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
index ba0d3cf..29ddb15 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
@@ -100,9 +100,9 @@ public class LocalReadableTable<K, V> extends 
BaseReadableTable<K, V> {
 
   private <T> T instrument(Counter counter, Timer timer, Supplier<T> func) {
     incCounter(counter);
-    long startNs = System.nanoTime();
+    long startNs = clock.nanoTime();
     T result = func.get();
-    updateTimer(timer, System.nanoTime() - startNs);
+    updateTimer(timer, clock.nanoTime() - startNs);
     return result;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
 
b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
index 70cde27..044fab4 100644
--- 
a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
+++ 
b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
@@ -212,9 +212,6 @@ public class TestLocalReadWriteTable {
     table.deleteAll(Collections.emptyList());
     table.deleteAllAsync(Collections.emptyList()).get();
     table.flush();
-    verify(metricsRegistry, atLeast(1)).newCounter(anyString(), anyString());
-    verify(metricsRegistry, times(0)).newTimer(anyString(), anyString());
-    verify(metricsRegistry, times(0)).newGauge(anyString(), any());
     Assert.assertEquals(1, numFlushes.getCount());
     Assert.assertEquals(2, numPuts.getCount());
     Assert.assertEquals(0, numPutAlls.getCount());

http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
 
b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
index 44802b0..e1c82d9 100644
--- 
a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
+++ 
b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
@@ -126,9 +126,6 @@ public class TestLocalReadableTable {
     table.getAsync("").get();
     table.getAll(keys);
     table.getAllAsync(keys).get();
-    verify(metricsRegistry, atLeast(1)).newCounter(anyString(), anyString());
-    verify(metricsRegistry, times(0)).newTimer(anyString(), anyString());
-    verify(metricsRegistry, times(0)).newGauge(anyString(), any());
     Assert.assertEquals(2, numGets.getCount());
     Assert.assertEquals(4, numMissedLookups.getCount());
     Assert.assertEquals(2, numGetAlls.getCount());

Reply via email to