Repository: samza Updated Branches: refs/heads/master bdce47707 -> 210631cd5
SAMZA-2015: Refactor timer handling in tables to be consistent with stores Currently when timer is disabled, we do not instantiate timer instances for tables, this introduced potential opportunities for NPE in the future. We wanted to refactor to use the same approach used in store implementation based on HighResolutionClock. Author: Wei Song <[email protected]> Reviewers: Xinyu Liu <[email protected]> Closes #835 from weisong44/SAMZA-2015 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/210631cd Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/210631cd Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/210631cd Branch: refs/heads/master Commit: 210631cd5c4487e99978189ee9b8d0b0c8847aba Parents: bdce477 Author: Wei Song <[email protected]> Authored: Fri Nov 30 12:52:59 2018 -0800 Committer: Wei Song <[email protected]> Committed: Fri Nov 30 12:52:59 2018 -0800 ---------------------------------------------------------------------- .../apache/samza/table/BaseReadableTable.java | 9 +++++ .../samza/table/caching/CachingTable.java | 28 +++++++-------- .../table/remote/RemoteReadWriteTable.java | 8 ++--- .../samza/table/remote/RemoteReadableTable.java | 8 ++--- .../samza/table/utils/TableReadMetrics.java | 15 ++------ .../samza/table/utils/TableWriteMetrics.java | 37 ++++++-------------- .../samza/table/caching/TestCachingTable.java | 14 +++----- .../samza/storage/kv/LocalReadWriteTable.java | 4 +-- .../samza/storage/kv/LocalReadableTable.java | 4 +-- .../storage/kv/TestLocalReadWriteTable.java | 3 -- .../storage/kv/TestLocalReadableTable.java | 3 -- 11 files changed, 52 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java index 7eaaa83..1dfd54c 100644 --- a/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java @@ -19,9 +19,11 @@ package org.apache.samza.table; import com.google.common.base.Preconditions; +import org.apache.samza.config.MetricsConfig; import org.apache.samza.context.Context; import org.apache.samza.table.utils.TableReadMetrics; import org.apache.samza.table.utils.TableWriteMetrics; +import org.apache.samza.util.HighResolutionClock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +43,8 @@ abstract public class BaseReadableTable<K, V> implements ReadableTable<K, V> { protected TableReadMetrics readMetrics; protected TableWriteMetrics writeMetrics; + protected HighResolutionClock clock; + /** * Construct an instance * @param tableId Id of the table @@ -54,6 +58,11 @@ abstract public class BaseReadableTable<K, V> implements ReadableTable<K, V> { @Override public void init(Context context) { + MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig()); + clock = metricsConfig.getMetricsTimerEnabled() + ? () -> System.nanoTime() + : () -> 0L; + readMetrics = new TableReadMetrics(context, this, tableId); if (this instanceof ReadWriteTable) { writeMetrics = new TableWriteMetrics(context, this, tableId); http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java index ac6188b..e63bf61 100644 --- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java @@ -130,7 +130,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V> return CompletableFuture.completedFuture(value); } - long startNs = System.nanoTime(); + long startNs = clock.nanoTime(); missCount.incrementAndGet(); return rdTable.getAsync(key).handle((result, e) -> { @@ -140,7 +140,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V> if (result != null) { cache.put(key, result); } - updateTimer(readMetrics.getNs, System.nanoTime() - startNs); + updateTimer(readMetrics.getNs, clock.nanoTime() - startNs); return result; } }); @@ -168,7 +168,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V> return CompletableFuture.completedFuture(getAllResult); } - long startNs = System.nanoTime(); + long startNs = clock.nanoTime(); return rdTable.getAllAsync(missingKeys).handle((records, e) -> { if (e != null) { throw new SamzaException("Failed to get records for " + keys, e); @@ -179,7 +179,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V> .collect(Collectors.toList())); getAllResult.putAll(records); } - updateTimer(readMetrics.getAllNs, System.nanoTime() - startNs); + updateTimer(readMetrics.getAllNs, clock.nanoTime() - startNs); return getAllResult; } }); @@ -201,7 +201,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V> incCounter(writeMetrics.numPuts); Preconditions.checkNotNull(rwTable, "Cannot write to a read-only table: " + rdTable); - long startNs = System.nanoTime(); + long startNs = clock.nanoTime(); return rwTable.putAsync(key, value).handle((result, e) -> { if (e != null) { throw new SamzaException(String.format("Failed to put a record, key=%s, value=%s", key, value), e); @@ -212,7 +212,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V> cache.put(key, value); } } - updateTimer(writeMetrics.putNs, System.nanoTime() - startNs); + updateTimer(writeMetrics.putNs, clock.nanoTime() - startNs); return result; }); } @@ -231,7 +231,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V> @Override public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> records) { incCounter(writeMetrics.numPutAlls); - long startNs = System.nanoTime(); + long startNs = clock.nanoTime(); Preconditions.checkNotNull(rwTable, "Cannot write to a read-only table: " + rdTable); return rwTable.putAllAsync(records).handle((result, e) -> { if (e != null) { @@ -240,7 +240,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V> cache.putAll(records); } - updateTimer(writeMetrics.putAllNs, System.nanoTime() - startNs); + updateTimer(writeMetrics.putAllNs, clock.nanoTime() - startNs); return result; }); } @@ -259,7 +259,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V> @Override public CompletableFuture<Void> deleteAsync(K key) { incCounter(writeMetrics.numDeletes); - long startNs = System.nanoTime(); + long startNs = clock.nanoTime(); Preconditions.checkNotNull(rwTable, "Cannot delete from a read-only table: " + rdTable); return rwTable.deleteAsync(key).handle((result, e) -> { if (e != null) { @@ -267,7 +267,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V> } else if (!isWriteAround) { cache.delete(key); } - updateTimer(writeMetrics.deleteNs, System.nanoTime() - startNs); + updateTimer(writeMetrics.deleteNs, clock.nanoTime() - startNs); return result; }); } @@ -284,7 +284,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V> @Override public CompletableFuture<Void> deleteAllAsync(List<K> keys) { incCounter(writeMetrics.numDeleteAlls); - long startNs = System.nanoTime(); + long startNs = clock.nanoTime(); Preconditions.checkNotNull(rwTable, "Cannot delete from a read-only table: " + rdTable); return rwTable.deleteAllAsync(keys).handle((result, e) -> { if (e != null) { @@ -292,7 +292,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V> } else if (!isWriteAround) { cache.deleteAll(keys); } - updateTimer(writeMetrics.deleteAllNs, System.nanoTime() - startNs); + updateTimer(writeMetrics.deleteAllNs, clock.nanoTime() - startNs); return result; }); } @@ -300,10 +300,10 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V> @Override public synchronized void flush() { incCounter(writeMetrics.numFlushes); - long startNs = System.nanoTime(); + long startNs = clock.nanoTime(); Preconditions.checkNotNull(rwTable, "Cannot flush a read-only table: " + rdTable); rwTable.flush(); - updateTimer(writeMetrics.flushNs, System.nanoTime() - startNs); + updateTimer(writeMetrics.flushNs, clock.nanoTime() - startNs); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java index b96087b..80c2cac 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java @@ -174,9 +174,9 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> public void flush() { try { incCounter(writeMetrics.numFlushes); - long startNs = System.nanoTime(); + long startNs = clock.nanoTime(); writeFn.flush(); - updateTimer(writeMetrics.flushNs, System.nanoTime() - startNs); + updateTimer(writeMetrics.flushNs, clock.nanoTime() - startNs); } catch (Exception e) { String errMsg = "Failed to flush remote store"; logger.error(errMsg, e); @@ -202,7 +202,7 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> protected CompletableFuture<Void> execute(TableRateLimiter<K, V> rateLimiter, K key, V value, BiFunction<K, V, CompletableFuture<Void>> method, Counter counter, Timer timer) { incCounter(counter); - final long startNs = System.nanoTime(); + final long startNs = clock.nanoTime(); CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited() ? CompletableFuture .runAsync(() -> rateLimiter.throttle(key, value), tableExecutor) @@ -223,7 +223,7 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> Collection<Entry<K, V>> records, Function<Collection<Entry<K, V>>, CompletableFuture<Void>> method, Counter counter, Timer timer) { incCounter(counter); - final long startNs = System.nanoTime(); + final long startNs = clock.nanoTime(); CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited() ? CompletableFuture .runAsync(() -> rateLimiter.throttleRecords(records), tableExecutor) http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java index e02650e..84a05b8 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java @@ -166,7 +166,7 @@ public class RemoteReadableTable<K, V> extends BaseReadableTable<K, V> { protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> rateLimiter, K key, Function<K, CompletableFuture<T>> method, Counter counter, Timer timer) { incCounter(counter); - final long startNs = System.nanoTime(); + final long startNs = clock.nanoTime(); CompletableFuture<T> ioFuture = rateLimiter.isRateLimited() ? CompletableFuture .runAsync(() -> rateLimiter.throttle(key), tableExecutor) @@ -187,7 +187,7 @@ public class RemoteReadableTable<K, V> extends BaseReadableTable<K, V> { protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> rateLimiter, Collection<K> keys, Function<Collection<K>, CompletableFuture<T>> method, Counter counter, Timer timer) { incCounter(counter); - final long startNs = System.nanoTime(); + final long startNs = clock.nanoTime(); CompletableFuture<T> ioFuture = rateLimiter.isRateLimited() ? CompletableFuture .runAsync(() -> rateLimiter.throttle(keys), tableExecutor) @@ -207,12 +207,12 @@ public class RemoteReadableTable<K, V> extends BaseReadableTable<K, V> { protected <T> CompletableFuture<T> completeExecution(CompletableFuture<T> ioFuture, long startNs, Timer timer) { if (callbackExecutor != null) { ioFuture.thenApplyAsync(r -> { - updateTimer(timer, System.nanoTime() - startNs); + updateTimer(timer, clock.nanoTime() - startNs); return r; }, callbackExecutor); } else { ioFuture.thenApply(r -> { - updateTimer(timer, System.nanoTime() - startNs); + updateTimer(timer, clock.nanoTime() - startNs); return r; }); } http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java index 0775844..e77fcfd 100644 --- a/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java +++ b/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java @@ -18,7 +18,6 @@ */ package org.apache.samza.table.utils; -import org.apache.samza.config.MetricsConfig; import org.apache.samza.context.Context; import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.Timer; @@ -34,7 +33,6 @@ public class TableReadMetrics { public final Timer getAllNs; public final Counter numGets; public final Counter numGetAlls; - public final Timer getCallbackNs; public final Counter numMissedLookups; /** @@ -47,19 +45,10 @@ public class TableReadMetrics { public TableReadMetrics(Context context, Table table, String tableId) { TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId); numGets = tableMetricsUtil.newCounter("num-gets"); + getNs = tableMetricsUtil.newTimer("get-ns"); numGetAlls = tableMetricsUtil.newCounter("num-getAlls"); + getAllNs = tableMetricsUtil.newTimer("getAll-ns"); numMissedLookups = tableMetricsUtil.newCounter("num-missed-lookups"); - - MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig()); - if (metricsConfig.getMetricsTimerEnabled()) { - getNs = tableMetricsUtil.newTimer("get-ns"); - getAllNs = tableMetricsUtil.newTimer("getAll-ns"); - getCallbackNs = tableMetricsUtil.newTimer("get-callback-ns"); - } else { - getNs = null; - getAllNs = null; - getCallbackNs = null; - } } } http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java index 02af35f..bf65b74 100644 --- a/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java +++ b/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java @@ -18,7 +18,6 @@ */ package org.apache.samza.table.utils; -import org.apache.samza.config.MetricsConfig; import org.apache.samza.context.Context; import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.Timer; @@ -27,18 +26,16 @@ import org.apache.samza.table.Table; public class TableWriteMetrics { - public final Timer putNs; - public final Timer putAllNs; - public final Timer deleteNs; - public final Timer deleteAllNs; - public final Timer flushNs; public final Counter numPuts; + public final Timer putNs; public final Counter numPutAlls; + public final Timer putAllNs; public final Counter numDeletes; + public final Timer deleteNs; public final Counter numDeleteAlls; + public final Timer deleteAllNs; public final Counter numFlushes; - public final Timer putCallbackNs; - public final Timer deleteCallbackNs; + public final Timer flushNs; /** * Utility class that contains the default set of write metrics. @@ -50,28 +47,14 @@ public class TableWriteMetrics { public TableWriteMetrics(Context context, Table table, String tableId) { TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId); numPuts = tableMetricsUtil.newCounter("num-puts"); + putNs = tableMetricsUtil.newTimer("put-ns"); numPutAlls = tableMetricsUtil.newCounter("num-putAlls"); + putAllNs = tableMetricsUtil.newTimer("putAll-ns"); numDeletes = tableMetricsUtil.newCounter("num-deletes"); + deleteNs = tableMetricsUtil.newTimer("delete-ns"); numDeleteAlls = tableMetricsUtil.newCounter("num-deleteAlls"); + deleteAllNs = tableMetricsUtil.newTimer("deleteAll-ns"); numFlushes = tableMetricsUtil.newCounter("num-flushes"); - - MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig()); - if (metricsConfig.getMetricsTimerEnabled()) { - putNs = tableMetricsUtil.newTimer("put-ns"); - putAllNs = tableMetricsUtil.newTimer("putAll-ns"); - deleteNs = tableMetricsUtil.newTimer("delete-ns"); - deleteAllNs = tableMetricsUtil.newTimer("deleteAll-ns"); - flushNs = tableMetricsUtil.newTimer("flush-ns"); - putCallbackNs = tableMetricsUtil.newTimer("put-callback-ns"); - deleteCallbackNs = tableMetricsUtil.newTimer("delete-callback-ns"); - } else { - putNs = null; - putAllNs = null; - deleteNs = null; - deleteAllNs = null; - flushNs = null; - putCallbackNs = null; - deleteCallbackNs = null; - } + flushNs = tableMetricsUtil.newTimer("flush-ns"); } } http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java index daaba46..5a19767 100644 --- a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java +++ b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java @@ -277,14 +277,14 @@ public class TestCachingTable { public void testGuavaCacheAndRemoteTable() throws Exception { String tableId = "testGuavaCacheAndRemoteTable"; Cache<String, String> guavaCache = CacheBuilder.newBuilder().initialCapacity(100).build(); - final ReadWriteTable<String, String> guavaTable = new GuavaCacheTable<>(tableId, guavaCache); + final ReadWriteTable<String, String> guavaTable = new GuavaCacheTable<>(tableId + "-cache", guavaCache); // It is okay to share rateLimitHelper and async helper for read/write in test TableRateLimiter<String, String> rateLimitHelper = mock(TableRateLimiter.class); TableReadFunction<String, String> readFn = mock(TableReadFunction.class); TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class); final RemoteReadWriteTable<String, String> remoteTable = new RemoteReadWriteTable<>( - tableId, readFn, writeFn, rateLimitHelper, rateLimitHelper, + tableId + "-remote", readFn, writeFn, rateLimitHelper, rateLimitHelper, Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor()); final CachingTable<String, String> cachingTable = new CachingTable<>( @@ -296,11 +296,11 @@ public class TestCachingTable { // 5 per read/write table (15) verify(metricsRegistry, times(24)).newCounter(any(), anyString()); - // 3 per readable table (9) - // 7 per read/write table (21) + // 2 per readable table (6) + // 5 per read/write table (15) // 1 per remote readable table (1) // 1 per remote read/write table (1) - verify(metricsRegistry, times(32)).newTimer(any(), anyString()); + verify(metricsRegistry, times(23)).newTimer(any(), anyString()); // 1 per guava table (1) // 3 per caching table (2) @@ -424,10 +424,6 @@ public class TestCachingTable { cachingTable.deleteAsync(""); cachingTable.deleteAll(Collections.emptyList()); cachingTable.deleteAllAsync(Collections.emptyList()); - - verify(metricsRegistry, atLeast(1)).newCounter(any(), anyString()); - verify(metricsRegistry, atLeast(1)).newGauge(anyString(), any()); - verify(metricsRegistry, times(0)).newTimer(any(), anyString()); } private TableDescriptor createDummyTableDescriptor(String tableId) { http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java index 2704429..eae6bb0 100644 --- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java @@ -146,9 +146,9 @@ public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V> private void instrument(Counter counter, Timer timer, Func0 func) { incCounter(counter); - long startNs = System.nanoTime(); + long startNs = clock.nanoTime(); func.apply(); - updateTimer(timer, System.nanoTime() - startNs); + updateTimer(timer, clock.nanoTime() - startNs); } } http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java index ba0d3cf..29ddb15 100644 --- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java @@ -100,9 +100,9 @@ public class LocalReadableTable<K, V> extends BaseReadableTable<K, V> { private <T> T instrument(Counter counter, Timer timer, Supplier<T> func) { incCounter(counter); - long startNs = System.nanoTime(); + long startNs = clock.nanoTime(); T result = func.get(); - updateTimer(timer, System.nanoTime() - startNs); + updateTimer(timer, clock.nanoTime() - startNs); return result; } } http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java index 70cde27..044fab4 100644 --- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java +++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java @@ -212,9 +212,6 @@ public class TestLocalReadWriteTable { table.deleteAll(Collections.emptyList()); table.deleteAllAsync(Collections.emptyList()).get(); table.flush(); - verify(metricsRegistry, atLeast(1)).newCounter(anyString(), anyString()); - verify(metricsRegistry, times(0)).newTimer(anyString(), anyString()); - verify(metricsRegistry, times(0)).newGauge(anyString(), any()); Assert.assertEquals(1, numFlushes.getCount()); Assert.assertEquals(2, numPuts.getCount()); Assert.assertEquals(0, numPutAlls.getCount()); http://git-wip-us.apache.org/repos/asf/samza/blob/210631cd/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java index 44802b0..e1c82d9 100644 --- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java +++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java @@ -126,9 +126,6 @@ public class TestLocalReadableTable { table.getAsync("").get(); table.getAll(keys); table.getAllAsync(keys).get(); - verify(metricsRegistry, atLeast(1)).newCounter(anyString(), anyString()); - verify(metricsRegistry, times(0)).newTimer(anyString(), anyString()); - verify(metricsRegistry, times(0)).newGauge(anyString(), any()); Assert.assertEquals(2, numGets.getCount()); Assert.assertEquals(4, numMissedLookups.getCount()); Assert.assertEquals(2, numGetAlls.getCount());
