Repository: samza Updated Branches: refs/heads/master 5c33fe772 -> 42b187a21
SAMZA-1751: Refactored metrics for table API Refactored metrics for table API - Added TableMetricsUtil that encapsulates required parameters, maintains naming consistency and simplifies metrics creation API for tables. - Added metrics to local table - Maintained consistency between local, remote and caching table Author: Wei Song <[email protected]> Reviewers: Peng Du<[email protected]> Closes #555 from weisong44/table-metrics Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/42b187a2 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/42b187a2 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/42b187a2 Branch: refs/heads/master Commit: 42b187a217d5caa882047ca836070f986753a141 Parents: 5c33fe7 Author: Wei Song <[email protected]> Authored: Fri Jun 15 16:27:45 2018 -0700 Committer: Jagadish <[email protected]> Committed: Fri Jun 15 16:27:45 2018 -0700 ---------------------------------------------------------------------- .../samza/table/caching/CachingTable.java | 39 ++++++- .../table/caching/CachingTableProvider.java | 4 +- .../table/caching/guava/GuavaCacheTable.java | 12 +-- .../caching/guava/GuavaCacheTableProvider.java | 1 + .../table/remote/RemoteReadWriteTable.java | 39 ++++--- .../samza/table/remote/RemoteReadableTable.java | 20 ++-- .../table/utils/DefaultTableReadMetrics.java | 55 ++++++++++ .../table/utils/DefaultTableWriteMetrics.java | 63 ++++++++++++ .../samza/table/utils/TableMetricsUtil.java | 101 +++++++++++++++++++ .../kv/LocalStoreBackedReadWriteTable.java | 33 +++++- .../kv/LocalStoreBackedReadableTable.java | 33 ++++-- .../TestLocalBaseStoreBackedTableProvider.java | 6 +- 12 files changed, 354 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/42b187a2/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java index 989828c..23e4f7f 100644 --- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java @@ -26,10 +26,12 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import org.apache.samza.container.SamzaContainerContext; -import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.storage.kv.Entry; import org.apache.samza.table.ReadWriteTable; import org.apache.samza.table.ReadableTable; +import org.apache.samza.table.utils.DefaultTableReadMetrics; +import org.apache.samza.table.utils.DefaultTableWriteMetrics; +import org.apache.samza.table.utils.TableMetricsUtil; import org.apache.samza.task.TaskContext; import com.google.common.base.Preconditions; @@ -73,6 +75,10 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> { // Use stripe based locking to allow parallelism of disjoint keys. private final Striped<Lock> stripedLocks; + // Metrics + private DefaultTableReadMetrics readMetrics; + private DefaultTableWriteMetrics writeMetrics; + // Common caching stats private AtomicLong hitCount = new AtomicLong(); private AtomicLong missCount = new AtomicLong(); @@ -91,14 +97,18 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> { */ @Override public void init(SamzaContainerContext containerContext, TaskContext taskContext) { - MetricsRegistry metricsRegistry = taskContext.getMetricsRegistry(); - metricsRegistry.newGauge(GROUP_NAME, new SupplierGauge(tableId + "-hit-rate", () -> hitRate())); - metricsRegistry.newGauge(GROUP_NAME, new SupplierGauge(tableId + "-miss-rate", () -> missRate())); - metricsRegistry.newGauge(GROUP_NAME, new SupplierGauge(tableId + "-req-count", () -> requestCount())); + readMetrics = new DefaultTableReadMetrics(containerContext, taskContext, this, tableId); + writeMetrics = new DefaultTableWriteMetrics(containerContext, taskContext, this, tableId); + TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, this, tableId); + tableMetricsUtil.newGauge("hit-rate", () -> hitRate()); + tableMetricsUtil.newGauge("miss-rate", () -> missRate()); + tableMetricsUtil.newGauge("req-count", () -> requestCount()); } @Override public V get(K key) { + readMetrics.numGets.inc(); + long startNs = System.nanoTime(); V value = cache.get(key); if (value == null) { missCount.incrementAndGet(); @@ -121,18 +131,24 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> { } else { hitCount.incrementAndGet(); } + readMetrics.getNs.update(System.nanoTime() - startNs); return value; } @Override public Map<K, V> getAll(List<K> keys) { + readMetrics.numGetAlls.inc(); + long startNs = System.nanoTime(); Map<K, V> getAllResult = new HashMap<>(); keys.stream().forEach(k -> getAllResult.put(k, get(k))); + readMetrics.getAllNs.update(System.nanoTime() - startNs); return getAllResult; } @Override public void put(K key, V value) { + writeMetrics.numPuts.inc(); + long startNs = System.nanoTime(); Preconditions.checkNotNull(rwTable, "Cannot write to a read-only table: " + rdTable); Lock lock = stripedLocks.get(key); try { @@ -144,16 +160,22 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> { } finally { lock.unlock(); } + writeMetrics.putNs.update(System.nanoTime() - startNs); } @Override public void putAll(List<Entry<K, V>> entries) { + writeMetrics.numPutAlls.inc(); + long startNs = System.nanoTime(); Preconditions.checkNotNull(rwTable, "Cannot write to a read-only table: " + rdTable); entries.forEach(e -> put(e.getKey(), e.getValue())); + writeMetrics.putAllNs.update(System.nanoTime() - startNs); } @Override public void delete(K key) { + writeMetrics.numDeletes.inc(); + long startNs = System.nanoTime(); Preconditions.checkNotNull(rwTable, "Cannot delete from a read-only table: " + rdTable); Lock lock = stripedLocks.get(key); try { @@ -163,18 +185,25 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> { } finally { lock.unlock(); } + writeMetrics.deleteNs.update(System.nanoTime() - startNs); } @Override public void deleteAll(List<K> keys) { + writeMetrics.numDeleteAlls.inc(); + long startNs = System.nanoTime(); Preconditions.checkNotNull(rwTable, "Cannot delete from a read-only table: " + rdTable); keys.stream().forEach(k -> delete(k)); + writeMetrics.deleteAllNs.update(System.nanoTime() - startNs); } @Override public synchronized void flush() { + writeMetrics.numFlushes.inc(); + long startNs = System.nanoTime(); Preconditions.checkNotNull(rwTable, "Cannot flush a read-only table: " + rdTable); rwTable.flush(); + writeMetrics.flushNs.update(System.nanoTime() - startNs); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/42b187a2/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java index 52d0c94..797d963 100644 --- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java +++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java @@ -88,7 +88,9 @@ public class CachingTableProvider implements TableProvider { int stripes = Integer.parseInt(cachingTableSpec.getConfig().get(LOCK_STRIPES)); boolean isWriteAround = Boolean.parseBoolean(cachingTableSpec.getConfig().get(WRITE_AROUND)); - return new CachingTable(cachingTableSpec.getId(), table, cache, stripes, isWriteAround); + CachingTable cachingTable = new CachingTable(cachingTableSpec.getId(), table, cache, stripes, isWriteAround); + cachingTable.init(containerContext, taskContext); + return cachingTable; } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/42b187a2/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java index 3f8ab51..fcded2f 100644 --- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java @@ -24,10 +24,9 @@ import java.util.List; import java.util.Map; import org.apache.samza.container.SamzaContainerContext; -import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.storage.kv.Entry; import org.apache.samza.table.ReadWriteTable; -import org.apache.samza.table.caching.SupplierGauge; +import org.apache.samza.table.utils.TableMetricsUtil; import org.apache.samza.task.TaskContext; import com.google.common.cache.Cache; @@ -51,17 +50,14 @@ public class GuavaCacheTable<K, V> implements ReadWriteTable<K, V> { this.cache = cache; } - private void registerMetrics(String tableId, Cache cache, MetricsRegistry metricsReg) { - // hit- and miss-rate are provided by CachingTable. - metricsReg.newGauge(GROUP_NAME, new SupplierGauge(tableId + "-evict-count", () -> cache.stats().evictionCount())); - } - /** * {@inheritDoc} */ @Override public void init(SamzaContainerContext containerContext, TaskContext taskContext) { - registerMetrics(tableId, cache, taskContext.getMetricsRegistry()); + TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, this, tableId); + // hit- and miss-rate are provided by CachingTable. + tableMetricsUtil.newGauge("evict-count", () -> cache.stats().evictionCount()); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/42b187a2/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java index 7395b22..1ba26c7 100644 --- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java +++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java @@ -66,6 +66,7 @@ public class GuavaCacheTableProvider implements TableProvider { public Table getTable() { Cache guavaCache = SerdeUtils.deserialize(GUAVA_CACHE, guavaCacheTableSpec.getConfig().get(GUAVA_CACHE)); GuavaCacheTable table = new GuavaCacheTable(guavaCacheTableSpec.getId(), guavaCache); + table.init(containerContext, taskContext); guavaTables.add(table); return table; } http://git-wip-us.apache.org/repos/asf/samza/blob/42b187a2/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java index a640efb..95f8cfa 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java @@ -23,10 +23,11 @@ import java.util.List; import org.apache.samza.SamzaException; import org.apache.samza.container.SamzaContainerContext; -import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.Timer; import org.apache.samza.storage.kv.Entry; import org.apache.samza.table.ReadWriteTable; +import org.apache.samza.table.utils.DefaultTableWriteMetrics; +import org.apache.samza.table.utils.TableMetricsUtil; import org.apache.samza.task.TaskContext; import org.apache.samza.util.RateLimiter; @@ -42,17 +43,13 @@ import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_WRITE_TAG; * @param <V> the type of the value in this table */ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implements ReadWriteTable<K, V> { + protected final TableWriteFunction<K, V> writeFn; protected final CreditFunction<K, V> writeCreditFn; protected final boolean rateLimitWrites; - protected Timer putNs; - protected Timer deleteNs; - protected Timer flushNs; + protected DefaultTableWriteMetrics writeMetrics; protected Timer putThrottleNs; // use single timer for all write operations - protected Counter numPuts; - protected Counter numDeletes; - protected Counter numFlushes; public RemoteReadWriteTable(String tableId, TableReadFunction readFn, TableWriteFunction writeFn, RateLimiter ratelimiter, CreditFunction<K, V> readCreditFn, CreditFunction<K, V> writeCreditFn) { @@ -70,13 +67,9 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem @Override public void init(SamzaContainerContext containerContext, TaskContext taskContext) { super.init(containerContext, taskContext); - putNs = taskContext.getMetricsRegistry().newTimer(groupName, tableId + "-put-ns"); - putThrottleNs = taskContext.getMetricsRegistry().newTimer(groupName, tableId + "-put-throttle-ns"); - deleteNs = taskContext.getMetricsRegistry().newTimer(groupName, tableId + "-delete-ns"); - flushNs = taskContext.getMetricsRegistry().newTimer(groupName, tableId + "-flush-ns"); - numPuts = taskContext.getMetricsRegistry().newCounter(groupName, tableId + "-num-puts"); - numDeletes = taskContext.getMetricsRegistry().newCounter(groupName, tableId + "-num-deletes"); - numFlushes = taskContext.getMetricsRegistry().newCounter(groupName, tableId + "-num-flushes"); + writeMetrics = new DefaultTableWriteMetrics(containerContext, taskContext, this, tableId); + TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, this, tableId); + putThrottleNs = tableMetricsUtil.newTimer("put-throttle-ns"); } /** @@ -91,13 +84,13 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem } try { - numPuts.inc(); + writeMetrics.numPuts.inc(); if (rateLimitWrites) { throttle(key, value, RL_WRITE_TAG, writeCreditFn, putThrottleNs); } long startNs = System.nanoTime(); writeFn.put(key, value); - putNs.update(System.nanoTime() - startNs); + writeMetrics.putNs.update(System.nanoTime() - startNs); } catch (Exception e) { String errMsg = String.format("Failed to put a record, key=%s, value=%s", key, value); logger.error(errMsg, e); @@ -111,7 +104,10 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem @Override public void putAll(List<Entry<K, V>> entries) { try { + writeMetrics.numPutAlls.inc(); + long startNs = System.nanoTime(); writeFn.putAll(entries); + writeMetrics.putAllNs.update(System.nanoTime() - startNs); } catch (Exception e) { String errMsg = String.format("Failed to put records: %s", entries); logger.error(errMsg, e); @@ -125,13 +121,13 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem @Override public void delete(K key) { try { - numDeletes.inc(); + writeMetrics.numDeletes.inc(); if (rateLimitWrites) { throttle(key, null, RL_WRITE_TAG, writeCreditFn, putThrottleNs); } long startNs = System.nanoTime(); writeFn.delete(key); - deleteNs.update(System.nanoTime() - startNs); + writeMetrics.deleteNs.update(System.nanoTime() - startNs); } catch (Exception e) { String errMsg = String.format("Failed to delete a record, key=%s", key); logger.error(errMsg, e); @@ -145,7 +141,10 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem @Override public void deleteAll(List<K> keys) { try { + writeMetrics.numDeleteAlls.inc(); writeFn.deleteAll(keys); + long startNs = System.nanoTime(); + writeMetrics.deleteAllNs.update(System.nanoTime() - startNs); } catch (Exception e) { String errMsg = String.format("Failed to delete records, keys=%s", keys); logger.error(errMsg, e); @@ -159,13 +158,13 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem @Override public void flush() { try { - numFlushes.inc(); + writeMetrics.numFlushes.inc(); if (rateLimitWrites) { throttle(null, null, RL_WRITE_TAG, writeCreditFn, putThrottleNs); } long startNs = System.nanoTime(); writeFn.flush(); - flushNs.update(System.nanoTime() - startNs); + writeMetrics.flushNs.update(System.nanoTime() - startNs); } catch (Exception e) { String errMsg = "Failed to flush remote store"; logger.error(errMsg, e); http://git-wip-us.apache.org/repos/asf/samza/blob/42b187a2/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java index ca8e96b..d919d2f 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java @@ -25,10 +25,11 @@ import java.util.Map; import org.apache.samza.SamzaException; import org.apache.samza.container.SamzaContainerContext; -import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.Timer; import org.apache.samza.operators.KV; import org.apache.samza.table.ReadableTable; +import org.apache.samza.table.utils.DefaultTableReadMetrics; +import org.apache.samza.table.utils.TableMetricsUtil; import org.apache.samza.task.TaskContext; import org.apache.samza.util.RateLimiter; import org.slf4j.Logger; @@ -63,6 +64,7 @@ import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_READ_TAG; * @param <V> the type of the value in this table */ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> { + protected final String tableId; protected final Logger logger; protected final TableReadFunction<K, V> readFn; @@ -71,9 +73,8 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> { protected final CreditFunction<K, V> readCreditFn; protected final boolean rateLimitReads; - protected Timer getNs; + protected DefaultTableReadMetrics readMetrics; protected Timer getThrottleNs; - protected Counter numGets; /** * Construct a RemoteReadableTable instance @@ -101,9 +102,9 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> { */ @Override public void init(SamzaContainerContext containerContext, TaskContext taskContext) { - getNs = taskContext.getMetricsRegistry().newTimer(groupName, tableId + "-get-ns"); - getThrottleNs = taskContext.getMetricsRegistry().newTimer(groupName, tableId + "-get-throttle-ns"); - numGets = taskContext.getMetricsRegistry().newCounter(groupName, tableId + "-num-gets"); + readMetrics = new DefaultTableReadMetrics(containerContext, taskContext, this, tableId); + TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, this, tableId); + getThrottleNs = tableMetricsUtil.newTimer("get-throttle-ns"); } /** @@ -112,13 +113,13 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> { @Override public V get(K key) { try { - numGets.inc(); + readMetrics.numGets.inc(); if (rateLimitReads) { throttle(key, null, RL_READ_TAG, readCreditFn, getThrottleNs); } long startNs = System.nanoTime(); V result = readFn.get(key); - getNs.update(System.nanoTime() - startNs); + readMetrics.getNs.update(System.nanoTime() - startNs); return result; } catch (Exception e) { String errMsg = String.format("Failed to get a record, key=%s", key); @@ -134,7 +135,10 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> { public Map<K, V> getAll(List<K> keys) { Map<K, V> result; try { + readMetrics.numGetAlls.inc(); + long startNs = System.nanoTime(); result = readFn.getAll(keys); + readMetrics.getAllNs.update(System.nanoTime() - startNs); } catch (Exception e) { String errMsg = "Failed to get some records"; logger.error(errMsg, e); http://git-wip-us.apache.org/repos/asf/samza/blob/42b187a2/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java new file mode 100644 index 0000000..a327ae3 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.table.utils; + +import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.Timer; +import org.apache.samza.table.Table; +import org.apache.samza.task.TaskContext; + + +/** + * Utility class that contains the default set of read metrics. + */ +public class DefaultTableReadMetrics { + + public final Timer getNs; + public final Timer getAllNs; + public final Counter numGets; + public final Counter numGetAlls; + + /** + * Constructor based on container and task container context + * + * @param containerContext container context + * @param taskContext task context + * @param table underlying table + * @param tableId table Id + */ + public DefaultTableReadMetrics(SamzaContainerContext containerContext, TaskContext taskContext, + Table table, String tableId) { + TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, table, tableId); + getNs = tableMetricsUtil.newTimer("get-ns"); + getAllNs = tableMetricsUtil.newTimer("getAll-ns"); + numGets = tableMetricsUtil.newCounter("num-gets"); + numGetAlls = tableMetricsUtil.newCounter("num-getAlls"); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/42b187a2/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java new file mode 100644 index 0000000..150ee9a --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.table.utils; + +import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.Timer; +import org.apache.samza.table.Table; +import org.apache.samza.task.TaskContext; + + +public class DefaultTableWriteMetrics { + + public final Timer putNs; + public final Timer putAllNs; + public final Timer deleteNs; + public final Timer deleteAllNs; + public final Timer flushNs; + public final Counter numPuts; + public final Counter numPutAlls; + public final Counter numDeletes; + public final Counter numDeleteAlls; + public final Counter numFlushes; + + /** + * Utility class that contains the default set of write metrics. + * + * @param containerContext container context + * @param taskContext task context + * @param table underlying table + * @param tableId table Id + */ + public DefaultTableWriteMetrics(SamzaContainerContext containerContext, TaskContext taskContext, + Table table, String tableId) { + TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, table, tableId); + putNs = tableMetricsUtil.newTimer("put-ns"); + putAllNs = tableMetricsUtil.newTimer("putAll-ns"); + deleteNs = tableMetricsUtil.newTimer("delete-ns"); + deleteAllNs = tableMetricsUtil.newTimer("deleteAll-ns"); + flushNs = tableMetricsUtil.newTimer("flush-ns"); + numPuts = tableMetricsUtil.newCounter("num-puts"); + numPutAlls = tableMetricsUtil.newCounter("num-putAlls"); + numDeletes = tableMetricsUtil.newCounter("num-deletes"); + numDeleteAlls = tableMetricsUtil.newCounter("num-deleteAlls"); + numFlushes = tableMetricsUtil.newCounter("num-flushes"); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/42b187a2/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java new file mode 100644 index 0000000..6805c64 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.utils; + +import java.util.function.Supplier; + +import com.google.common.base.Preconditions; + +import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.Gauge; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.metrics.Timer; +import org.apache.samza.table.Table; +import org.apache.samza.table.caching.SupplierGauge; +import org.apache.samza.task.TaskContext; + + +/** + * Utility class to generate metrics that helps to encapsulate required parameters, + * maintains naming consistency and simplifies metrics creation API for tables. + */ +public class TableMetricsUtil { + + private final MetricsRegistry metricsRegistry; + private final String groupName; + private final String tableId; + + /** + * Constructor based on container context + * + * @param containerContext container context + * @param taskContext task context + * @param table underlying table + * @param tableId table Id + */ + public TableMetricsUtil(SamzaContainerContext containerContext, TaskContext taskContext, + Table table, String tableId) { + + Preconditions.checkNotNull(containerContext); + Preconditions.checkNotNull(table); + Preconditions.checkNotNull(tableId); + + this.metricsRegistry = taskContext == null // The table is at container level, when the task + ? containerContext.metricsRegistry // context passed in is null + : taskContext.getMetricsRegistry(); + this.groupName = table.getClass().getSimpleName(); + this.tableId = tableId; + } + + /** + * Create a new counter by delegating to the underlying metrics registry + * @param name name of the counter + * @return newly created counter + */ + public Counter newCounter(String name) { + return metricsRegistry.newCounter(groupName, getMetricFullName(name)); + } + + /** + * Create a new timer by delegating to the underlying metrics registry + * @param name name of the timer + * @return newly created timer + */ + public Timer newTimer(String name) { + return metricsRegistry.newTimer(groupName, getMetricFullName(name)); + } + + /** + * Create a new gauge by delegating to the underlying metrics registry + * @param name name of the gauge + * @param supplier a function that supplies the value + * @param <T> type of the value + * @return newly created gauge + */ + public <T> Gauge<T> newGauge(String name, Supplier<T> supplier) { + return metricsRegistry.newGauge(groupName, new SupplierGauge(getMetricFullName(name), supplier)); + } + + private String getMetricFullName(String name) { + return String.format("%s-%s", tableId, name); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/42b187a2/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java index 906ee1d..882ae0d 100644 --- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java @@ -20,7 +20,10 @@ package org.apache.samza.storage.kv; import java.util.List; +import org.apache.samza.container.SamzaContainerContext; import org.apache.samza.table.ReadWriteTable; +import org.apache.samza.table.utils.DefaultTableWriteMetrics; +import org.apache.samza.task.TaskContext; /** @@ -32,6 +35,8 @@ import org.apache.samza.table.ReadWriteTable; public class LocalStoreBackedReadWriteTable<K, V> extends LocalStoreBackedReadableTable<K, V> implements ReadWriteTable<K, V> { + protected DefaultTableWriteMetrics writeMetrics; + /** * Constructs an instance of {@link LocalStoreBackedReadWriteTable} * @param kvStore the backing store @@ -40,10 +45,22 @@ public class LocalStoreBackedReadWriteTable<K, V> extends LocalStoreBackedReadab super(tableId, kvStore); } + /** + * {@inheritDoc} + */ + @Override + public void init(SamzaContainerContext containerContext, TaskContext taskContext) { + super.init(containerContext, taskContext); + writeMetrics = new DefaultTableWriteMetrics(containerContext, taskContext, this, tableId); + } + @Override public void put(K key, V value) { if (value != null) { + writeMetrics.numPuts.inc(); + long startNs = System.nanoTime(); kvStore.put(key, value); + writeMetrics.putNs.update(System.nanoTime() - startNs); } else { delete(key); } @@ -51,22 +68,34 @@ public class LocalStoreBackedReadWriteTable<K, V> extends LocalStoreBackedReadab @Override public void putAll(List<Entry<K, V>> entries) { - entries.forEach(e -> kvStore.put(e.getKey(), e.getValue())); + writeMetrics.numPutAlls.inc(); + long startNs = System.nanoTime(); + kvStore.putAll(entries); + writeMetrics.putAllNs.update(System.nanoTime() - startNs); } @Override public void delete(K key) { + writeMetrics.numDeletes.inc(); + long startNs = System.nanoTime(); kvStore.delete(key); + writeMetrics.deleteNs.update(System.nanoTime() - startNs); } @Override public void deleteAll(List<K> keys) { - keys.forEach(k -> kvStore.delete(k)); + writeMetrics.numDeleteAlls.inc(); + long startNs = System.nanoTime(); + kvStore.deleteAll(keys); + writeMetrics.deleteAllNs.update(System.nanoTime() - startNs); } @Override public void flush() { + writeMetrics.numFlushes.inc(); + long startNs = System.nanoTime(); kvStore.flush(); + writeMetrics.flushNs.update(System.nanoTime() - startNs); } } http://git-wip-us.apache.org/repos/asf/samza/blob/42b187a2/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java index 5ff58ab..8d79e0d 100644 --- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java @@ -20,11 +20,12 @@ package org.apache.samza.storage.kv; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; - -import org.apache.samza.table.ReadableTable; import com.google.common.base.Preconditions; +import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.table.ReadableTable; +import org.apache.samza.table.utils.DefaultTableReadMetrics; +import org.apache.samza.task.TaskContext; /** @@ -35,8 +36,10 @@ import com.google.common.base.Preconditions; */ public class LocalStoreBackedReadableTable<K, V> implements ReadableTable<K, V> { - protected KeyValueStore<K, V> kvStore; - protected String tableId; + protected final KeyValueStore<K, V> kvStore; + protected final String tableId; + + protected DefaultTableReadMetrics readMetrics; /** * Constructs an instance of {@link LocalStoreBackedReadableTable} @@ -49,14 +52,30 @@ public class LocalStoreBackedReadableTable<K, V> implements ReadableTable<K, V> this.kvStore = kvStore; } + /** + * {@inheritDoc} + */ + @Override + public void init(SamzaContainerContext containerContext, TaskContext taskContext) { + readMetrics = new DefaultTableReadMetrics(containerContext, taskContext, this, tableId); + } + @Override public V get(K key) { - return kvStore.get(key); + readMetrics.numGets.inc(); + long startNs = System.nanoTime(); + V result = kvStore.get(key); + readMetrics.getNs.update(System.nanoTime() - startNs); + return result; } @Override public Map<K, V> getAll(List<K> keys) { - return keys.stream().collect(Collectors.toMap(k -> k, k -> kvStore.get(k))); + readMetrics.numGetAlls.inc(); + long startNs = System.nanoTime(); + Map<K, V> result = kvStore.getAll(keys); + readMetrics.getAllNs.update(System.nanoTime() - startNs); + return result; } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/42b187a2/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java index d30c18f..56818b5 100644 --- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java +++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java @@ -25,9 +25,11 @@ import java.util.Map; import org.apache.samza.SamzaException; import org.apache.samza.config.JavaTableConfig; import org.apache.samza.config.StorageConfig; +import org.apache.samza.container.SamzaContainerContext; import org.apache.samza.storage.StorageEngine; import org.apache.samza.table.TableSpec; import org.apache.samza.task.TaskContext; +import org.apache.samza.util.NoOpMetricsRegistry; import org.junit.Before; import org.junit.Test; @@ -57,9 +59,11 @@ public class TestLocalBaseStoreBackedTableProvider { @Test public void testInit() { StorageEngine store = mock(KeyValueStorageEngine.class); + SamzaContainerContext containerContext = mock(SamzaContainerContext.class); TaskContext taskContext = mock(TaskContext.class); when(taskContext.getStore(any())).thenReturn(store); - tableProvider.init(null, taskContext); + when(taskContext.getMetricsRegistry()).thenReturn(new NoOpMetricsRegistry()); + tableProvider.init(containerContext, taskContext); Assert.assertNotNull(tableProvider.getTable()); }
