SAMZA-2004: Add ability to disable table metrics For jobs with very high throughput, it is desirable to disable metrics on tables. We would introduce the option to disable all metrics for a table on table descriptor.
Author: Wei Song <[email protected]> Reviewers: Xinyu Liu <[email protected]> Closes #822 from weisong44/SAMZA-2004-2 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5069f1dd Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5069f1dd Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5069f1dd Branch: refs/heads/master Commit: 5069f1ddb74587ad6fb11d515bece45bd2a8f3bc Parents: e25e0da Author: Wei Song <[email protected]> Authored: Tue Nov 27 10:40:34 2018 -0800 Committer: Wei Song <[email protected]> Committed: Tue Nov 27 10:40:34 2018 -0800 ---------------------------------------------------------------------- .../documentation/versioned/api/table-api.md | 1 + .../apache/samza/config/JavaTableConfig.java | 2 +- .../table/descriptors/BaseTableDescriptor.java | 13 +- .../table/descriptors/LocalTableDescriptor.java | 3 - .../table/descriptors/TableDescriptor.java | 9 - .../samza/table/remote/TableRateLimiter.java | 4 +- .../apache/samza/table/BaseReadableTable.java | 66 +++++ .../apache/samza/table/BaseTableProvider.java | 4 +- .../samza/table/caching/CachingTable.java | 48 ++-- .../table/caching/guava/GuavaCacheTable.java | 9 +- .../table/remote/RemoteReadWriteTable.java | 82 +++++-- .../samza/table/remote/RemoteReadableTable.java | 145 +++-------- .../table/utils/DefaultTableReadMetrics.java | 56 ----- .../table/utils/DefaultTableWriteMetrics.java | 64 ----- .../samza/table/utils/TableMetricsUtil.java | 14 +- .../samza/table/utils/TableReadMetrics.java | 65 +++++ .../samza/table/utils/TableWriteMetrics.java | 77 ++++++ .../samza/config/TestJavaTableConfig.java | 23 +- .../samza/table/caching/TestCachingTable.java | 90 ++++++- .../table/remote/TestRemoteReadWriteTable.java | 2 +- .../descriptors/TestRemoteTableDescriptor.java | 12 +- .../samza/storage/kv/LocalReadWriteTable.java | 49 ++-- .../samza/storage/kv/LocalReadableTable.java | 46 ++-- .../storage/kv/TestLocalReadWriteTable.java | 244 +++++++++++++++++++ .../storage/kv/TestLocalReadableTable.java | 158 ++++++++++++ .../storage/kv/TestLocalTableProvider.java | 66 +++++ .../kv/descriptors/TestLocalTableProvider.java | 60 ----- .../sql/impl/ConfigBasedIOResolverFactory.java | 2 - .../apache/samza/test/table/TestLocalTable.java | 64 +---- .../samza/test/table/TestRemoteTable.java | 7 +- 30 files changed, 995 insertions(+), 490 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/docs/learn/documentation/versioned/api/table-api.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/api/table-api.md b/docs/learn/documentation/versioned/api/table-api.md index 0a9c33c..f53d9f5 100644 --- a/docs/learn/documentation/versioned/api/table-api.md +++ b/docs/learn/documentation/versioned/api/table-api.md @@ -248,6 +248,7 @@ The table below summarizes table metrics: |`getAll-ns`|`ReadableTable`|Average latency of `getAll/getAllAsync()` operations| |`num-gets`|`ReadableTable`|Count of `get/getAsync()` operations |`num-getAlls`|`ReadableTable`|Count of `getAll/getAllAsync()` operations +|`num-missed-lookups`|`ReadableTable`|Count of missed get/getAll() operations |`put-ns`|`ReadWriteTable`|Average latency of `put/putAsync()` operations |`putAll-ns`|`ReadWriteTable`|Average latency of `putAll/putAllAsync()` operations |`num-puts`|`ReadWriteTable`|Count of `put/putAsync()` operations http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-api/src/main/java/org/apache/samza/config/JavaTableConfig.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/config/JavaTableConfig.java b/samza-api/src/main/java/org/apache/samza/config/JavaTableConfig.java index c4381a1..ee0045a 100644 --- a/samza-api/src/main/java/org/apache/samza/config/JavaTableConfig.java +++ b/samza-api/src/main/java/org/apache/samza/config/JavaTableConfig.java @@ -81,7 +81,7 @@ public class JavaTableConfig extends MapConfig { * @param tableId Id of the table * @return serde retistry key */ - public String getValueSerde(String tableId) { + public String getMsgSerde(String tableId) { return get(String.format(STORE_MSG_SERDE, tableId), null); } http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java index d660276..26c2ae3 100644 --- a/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java +++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java @@ -19,11 +19,11 @@ package org.apache.samza.table.descriptors; +import com.google.common.base.Preconditions; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.config.Config; import org.apache.samza.config.JavaTableConfig; @@ -34,7 +34,6 @@ import org.apache.samza.config.JavaTableConfig; * @param <V> the type of the value in this table * @param <D> the type of the concrete table descriptor */ [email protected] abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K, V, D>> implements TableDescriptor<K, V, D> { @@ -50,7 +49,13 @@ abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K, this.tableId = tableId; } - @Override + /** + * Add a configuration entry for the table + * + * @param key the key + * @param value the value + * @return this table descriptor instance + */ public D withConfig(String key, String value) { config.put(key, value); return (D) this; @@ -64,6 +69,8 @@ abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K, @Override public Map<String, String> toConfig(Config jobConfig) { + Preconditions.checkNotNull(jobConfig, "Job config is null"); + validate(); Map<String, String> tableConfig = new HashMap<>(config); http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java index d194091..1ebb580 100644 --- a/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java +++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java @@ -28,7 +28,6 @@ import java.util.regex.Pattern; import org.apache.commons.lang3.StringUtils; import org.apache.samza.config.Config; -import org.apache.samza.config.JavaTableConfig; import org.apache.samza.serializers.KVSerde; import org.apache.samza.storage.SideInputsProcessor; import org.apache.samza.table.utils.SerdeUtils; @@ -143,8 +142,6 @@ abstract public class LocalTableDescriptor<K, V, D extends LocalTableDescriptor< Map<String, String> tableConfig = new HashMap<>(super.toConfig(jobConfig)); - JavaTableConfig javaTableConfig = new JavaTableConfig(jobConfig); - if (sideInputs != null && !sideInputs.isEmpty()) { sideInputs.forEach(si -> Preconditions.checkState(isValidSystemStreamName(si), String.format( "Side input stream %s doesn't confirm to pattern %s", si, SYSTEM_STREAM_NAME_PATTERN))); http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java index 806d158..81a98e6 100644 --- a/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java +++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java @@ -67,15 +67,6 @@ public interface TableDescriptor<K, V, D extends TableDescriptor<K, V, D>> { String getTableId(); /** - * Add a configuration entry for the table - * - * @param key the key - * @param value the value - * @return this table descriptor instance - */ - D withConfig(String key, String value); - - /** * Generate configuration for this table descriptor, the generated configuration * should be the complete configuration for this table that can be directly * included in the job configuration. http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java b/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java index c67a648..37d6385 100644 --- a/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java +++ b/samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java @@ -122,7 +122,9 @@ public class TableRateLimiter<K, V> { long startNs = System.nanoTime(); rateLimiter.acquire(Collections.singletonMap(tag, credits)); - waitTimeMetric.update(System.nanoTime() - startNs); + if (waitTimeMetric != null) { + waitTimeMetric.update(System.nanoTime() - startNs); + } } /** http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java new file mode 100644 index 0000000..7eaaa83 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.table; + +import com.google.common.base.Preconditions; +import org.apache.samza.context.Context; +import org.apache.samza.table.utils.TableReadMetrics; +import org.apache.samza.table.utils.TableWriteMetrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Base class for all readable tables + * + * @param <K> the type of the key in this table + * @param <V> the type of the value in this table + */ +abstract public class BaseReadableTable<K, V> implements ReadableTable<K, V> { + + protected final Logger logger; + + protected final String tableId; + + protected TableReadMetrics readMetrics; + protected TableWriteMetrics writeMetrics; + + /** + * Construct an instance + * @param tableId Id of the table + */ + public BaseReadableTable(String tableId) { + Preconditions.checkArgument(tableId != null & !tableId.isEmpty(), + String.format("Invalid table Id: %s", tableId)); + this.tableId = tableId; + this.logger = LoggerFactory.getLogger(getClass().getName() + "." + tableId); + } + + @Override + public void init(Context context) { + readMetrics = new TableReadMetrics(context, this, tableId); + if (this instanceof ReadWriteTable) { + writeMetrics = new TableWriteMetrics(context, this, tableId); + } + } + + public String getTableId() { + return tableId; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java index 7ad423d..f2c0b08 100644 --- a/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java +++ b/samza-core/src/main/java/org/apache/samza/table/BaseTableProvider.java @@ -28,9 +28,9 @@ import org.slf4j.LoggerFactory; */ abstract public class BaseTableProvider implements TableProvider { - final protected Logger logger = LoggerFactory.getLogger(getClass()); + protected final Logger logger = LoggerFactory.getLogger(getClass()); - final protected String tableId; + protected final String tableId; protected Context context; http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java index 703a6ff..ac6188b 100644 --- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java @@ -23,10 +23,9 @@ import com.google.common.base.Preconditions; import org.apache.samza.SamzaException; import org.apache.samza.context.Context; import org.apache.samza.storage.kv.Entry; +import org.apache.samza.table.BaseReadableTable; import org.apache.samza.table.ReadWriteTable; import org.apache.samza.table.ReadableTable; -import org.apache.samza.table.utils.DefaultTableReadMetrics; -import org.apache.samza.table.utils.DefaultTableWriteMetrics; import org.apache.samza.table.utils.TableMetricsUtil; import java.util.ArrayList; @@ -37,6 +36,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import static org.apache.samza.table.utils.TableMetricsUtil.incCounter; +import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer; + /** * A composite table incorporating a cache with a Samza table. The cache is @@ -62,23 +64,20 @@ import java.util.stream.Collectors; * @param <K> type of the table key * @param <V> type of the table value */ -public class CachingTable<K, V> implements ReadWriteTable<K, V> { - private final String tableId; +public class CachingTable<K, V> extends BaseReadableTable<K, V> + implements ReadWriteTable<K, V> { + private final ReadableTable<K, V> rdTable; private final ReadWriteTable<K, V> rwTable; private final ReadWriteTable<K, V> cache; private final boolean isWriteAround; - // Metrics - private DefaultTableReadMetrics readMetrics; - private DefaultTableWriteMetrics writeMetrics; - // Common caching stats private AtomicLong hitCount = new AtomicLong(); private AtomicLong missCount = new AtomicLong(); public CachingTable(String tableId, ReadableTable<K, V> table, ReadWriteTable<K, V> cache, boolean isWriteAround) { - this.tableId = tableId; + super(tableId); this.rdTable = table; this.rwTable = table instanceof ReadWriteTable ? (ReadWriteTable) table : null; this.cache = cache; @@ -87,8 +86,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> { @Override public void init(Context context) { - readMetrics = new DefaultTableReadMetrics(context, this, tableId); - writeMetrics = new DefaultTableWriteMetrics(context, this, tableId); + super.init(context); TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId); tableMetricsUtil.newGauge("hit-rate", () -> hitRate()); tableMetricsUtil.newGauge("miss-rate", () -> missRate()); @@ -125,7 +123,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> { @Override public CompletableFuture<V> getAsync(K key) { - readMetrics.numGets.inc(); + incCounter(readMetrics.numGets); V value = cache.get(key); if (value != null) { hitCount.incrementAndGet(); @@ -142,7 +140,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> { if (result != null) { cache.put(key, result); } - readMetrics.getNs.update(System.nanoTime() - startNs); + updateTimer(readMetrics.getNs, System.nanoTime() - startNs); return result; } }); @@ -161,7 +159,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> { @Override public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) { - readMetrics.numGetAlls.inc(); + incCounter(readMetrics.numGetAlls); // Make a copy of entries which might be immutable Map<K, V> getAllResult = new HashMap<>(); List<K> missingKeys = lookupCache(keys, getAllResult); @@ -181,7 +179,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> { .collect(Collectors.toList())); getAllResult.putAll(records); } - readMetrics.getAllNs.update(System.nanoTime() - startNs); + updateTimer(readMetrics.getAllNs, System.nanoTime() - startNs); return getAllResult; } }); @@ -200,7 +198,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> { @Override public CompletableFuture<Void> putAsync(K key, V value) { - writeMetrics.numPuts.inc(); + incCounter(writeMetrics.numPuts); Preconditions.checkNotNull(rwTable, "Cannot write to a read-only table: " + rdTable); long startNs = System.nanoTime(); @@ -214,7 +212,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> { cache.put(key, value); } } - writeMetrics.putNs.update(System.nanoTime() - startNs); + updateTimer(writeMetrics.putNs, System.nanoTime() - startNs); return result; }); } @@ -232,7 +230,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> { @Override public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> records) { - writeMetrics.numPutAlls.inc(); + incCounter(writeMetrics.numPutAlls); long startNs = System.nanoTime(); Preconditions.checkNotNull(rwTable, "Cannot write to a read-only table: " + rdTable); return rwTable.putAllAsync(records).handle((result, e) -> { @@ -242,7 +240,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> { cache.putAll(records); } - writeMetrics.putAllNs.update(System.nanoTime() - startNs); + updateTimer(writeMetrics.putAllNs, System.nanoTime() - startNs); return result; }); } @@ -260,7 +258,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> { @Override public CompletableFuture<Void> deleteAsync(K key) { - writeMetrics.numDeletes.inc(); + incCounter(writeMetrics.numDeletes); long startNs = System.nanoTime(); Preconditions.checkNotNull(rwTable, "Cannot delete from a read-only table: " + rdTable); return rwTable.deleteAsync(key).handle((result, e) -> { @@ -269,7 +267,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> { } else if (!isWriteAround) { cache.delete(key); } - writeMetrics.deleteNs.update(System.nanoTime() - startNs); + updateTimer(writeMetrics.deleteNs, System.nanoTime() - startNs); return result; }); } @@ -285,7 +283,7 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> { @Override public CompletableFuture<Void> deleteAllAsync(List<K> keys) { - writeMetrics.numDeleteAlls.inc(); + incCounter(writeMetrics.numDeleteAlls); long startNs = System.nanoTime(); Preconditions.checkNotNull(rwTable, "Cannot delete from a read-only table: " + rdTable); return rwTable.deleteAllAsync(keys).handle((result, e) -> { @@ -294,18 +292,18 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> { } else if (!isWriteAround) { cache.deleteAll(keys); } - writeMetrics.deleteAllNs.update(System.nanoTime() - startNs); + updateTimer(writeMetrics.deleteAllNs, System.nanoTime() - startNs); return result; }); } @Override public synchronized void flush() { - writeMetrics.numFlushes.inc(); + incCounter(writeMetrics.numFlushes); long startNs = System.nanoTime(); Preconditions.checkNotNull(rwTable, "Cannot flush a read-only table: " + rdTable); rwTable.flush(); - writeMetrics.flushNs.update(System.nanoTime() - startNs); + updateTimer(writeMetrics.flushNs, System.nanoTime() - startNs); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java index 391f068..b75a0bc 100644 --- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java @@ -23,6 +23,7 @@ import com.google.common.cache.Cache; import org.apache.samza.SamzaException; import org.apache.samza.context.Context; import org.apache.samza.storage.kv.Entry; +import org.apache.samza.table.BaseReadableTable; import org.apache.samza.table.ReadWriteTable; import org.apache.samza.table.utils.TableMetricsUtil; @@ -39,17 +40,19 @@ import java.util.concurrent.CompletableFuture; * @param <K> type of the key in the cache * @param <V> type of the value in the cache */ -public class GuavaCacheTable<K, V> implements ReadWriteTable<K, V> { - private final String tableId; +public class GuavaCacheTable<K, V> extends BaseReadableTable<K, V> + implements ReadWriteTable<K, V> { + private final Cache<K, V> cache; public GuavaCacheTable(String tableId, Cache<K, V> cache) { - this.tableId = tableId; + super(tableId); this.cache = cache; } @Override public void init(Context context) { + super.init(context); TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId); // hit- and miss-rate are provided by CachingTable. tableMetricsUtil.newGauge("evict-count", () -> cache.stats().evictionCount()); http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java index 60ac4b7..b96087b 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java @@ -21,11 +21,16 @@ package org.apache.samza.table.remote; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import java.util.Collection; +import java.util.function.BiFunction; +import java.util.function.Function; import org.apache.samza.SamzaException; +import org.apache.samza.config.MetricsConfig; import org.apache.samza.context.Context; +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.Timer; import org.apache.samza.storage.kv.Entry; import org.apache.samza.table.ReadWriteTable; -import org.apache.samza.table.utils.DefaultTableWriteMetrics; import org.apache.samza.table.utils.TableMetricsUtil; import java.util.List; @@ -33,6 +38,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; +import static org.apache.samza.table.utils.TableMetricsUtil.incCounter; +import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer; + /** * Remote store backed read writable table @@ -40,9 +48,8 @@ import java.util.stream.Collectors; * @param <K> the type of the key in this table * @param <V> the type of the value in this table */ -public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implements ReadWriteTable<K, V> { - - private DefaultTableWriteMetrics writeMetrics; +public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> + implements ReadWriteTable<K, V> { protected final TableWriteFunction<K, V> writeFn; protected final TableRateLimiter writeRateLimiter; @@ -59,9 +66,11 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem @Override public void init(Context context) { super.init(context); - writeMetrics = new DefaultTableWriteMetrics(context, this, tableId); - TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId); - writeRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("put-throttle-ns")); + MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig()); + if (metricsConfig.getMetricsTimerEnabled()) { + TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId); + writeRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("put-throttle-ns")); + } } @Override @@ -80,8 +89,7 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem return deleteAsync(key); } - writeMetrics.numPuts.inc(); - return execute(writeRateLimiter, key, value, writeFn::putAsync, writeMetrics.putNs) + return execute(writeRateLimiter, key, value, writeFn::putAsync, writeMetrics.numPuts, writeMetrics.putNs) .exceptionally(e -> { throw new SamzaException("Failed to put a record with key=" + key, (Throwable) e); }); @@ -103,8 +111,6 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem return CompletableFuture.completedFuture(null); } - writeMetrics.numPutAlls.inc(); - List<K> deleteKeys = records.stream() .filter(e -> e.getValue() == null).map(Entry::getKey).collect(Collectors.toList()); @@ -117,7 +123,7 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem // Return the combined future return CompletableFuture.allOf( deleteFuture, - executeRecords(writeRateLimiter, putRecords, writeFn::putAllAsync, writeMetrics.putAllNs)) + executeRecords(writeRateLimiter, putRecords, writeFn::putAllAsync, writeMetrics.numPutAlls, writeMetrics.putAllNs)) .exceptionally(e -> { String strKeys = records.stream().map(r -> r.getKey().toString()).collect(Collectors.joining(",")); throw new SamzaException(String.format("Failed to put records with keys=" + strKeys), e); @@ -136,8 +142,7 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem @Override public CompletableFuture<Void> deleteAsync(K key) { Preconditions.checkNotNull(key); - writeMetrics.numDeletes.inc(); - return execute(writeRateLimiter, key, writeFn::deleteAsync, writeMetrics.deleteNs) + return execute(writeRateLimiter, key, writeFn::deleteAsync, writeMetrics.numDeletes, writeMetrics.deleteNs) .exceptionally(e -> { throw new SamzaException(String.format("Failed to delete the record for " + key), (Throwable) e); }); @@ -159,8 +164,7 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem return CompletableFuture.completedFuture(null); } - writeMetrics.numDeleteAlls.inc(); - return execute(writeRateLimiter, keys, writeFn::deleteAllAsync, writeMetrics.deleteAllNs) + return execute(writeRateLimiter, keys, writeFn::deleteAllAsync, writeMetrics.numDeleteAlls, writeMetrics.deleteAllNs) .exceptionally(e -> { throw new SamzaException(String.format("Failed to delete records for " + keys), (Throwable) e); }); @@ -169,10 +173,10 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem @Override public void flush() { try { - writeMetrics.numFlushes.inc(); + incCounter(writeMetrics.numFlushes); long startNs = System.nanoTime(); writeFn.flush(); - writeMetrics.flushNs.update(System.nanoTime() - startNs); + updateTimer(writeMetrics.flushNs, System.nanoTime() - startNs); } catch (Exception e) { String errMsg = "Failed to flush remote store"; logger.error(errMsg, e); @@ -186,6 +190,48 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem super.close(); } + /** + * Execute an async request given a table record (key+value) + * @param rateLimiter helper for rate limiting + * @param key key of the table record + * @param value value of the table record + * @param method method to be executed + * @param timer latency metric to be updated + * @return CompletableFuture of the operation + */ + protected CompletableFuture<Void> execute(TableRateLimiter<K, V> rateLimiter, + K key, V value, BiFunction<K, V, CompletableFuture<Void>> method, Counter counter, Timer timer) { + incCounter(counter); + final long startNs = System.nanoTime(); + CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited() + ? CompletableFuture + .runAsync(() -> rateLimiter.throttle(key, value), tableExecutor) + .thenCompose((r) -> method.apply(key, value)) + : method.apply(key, value); + return completeExecution(ioFuture, startNs, timer); + } + + /** + * Execute an async request given a collection of table records + * @param rateLimiter helper for rate limiting + * @param records list of records + * @param method method to be executed + * @param timer latency metric to be updated + * @return CompletableFuture of the operation + */ + protected CompletableFuture<Void> executeRecords(TableRateLimiter<K, V> rateLimiter, + Collection<Entry<K, V>> records, Function<Collection<Entry<K, V>>, CompletableFuture<Void>> method, + Counter counter, Timer timer) { + incCounter(counter); + final long startNs = System.nanoTime(); + CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited() + ? CompletableFuture + .runAsync(() -> rateLimiter.throttleRecords(records), tableExecutor) + .thenCompose((r) -> method.apply(records)) + : method.apply(records); + return completeExecution(ioFuture, startNs, timer); + } + @VisibleForTesting public TableWriteFunction<K, V> getWriteFn() { return writeFn; http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java index 1b6bfea..e02650e 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java @@ -23,14 +23,12 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.Objects; import org.apache.samza.SamzaException; +import org.apache.samza.config.MetricsConfig; import org.apache.samza.context.Context; +import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.Timer; -import org.apache.samza.storage.kv.Entry; -import org.apache.samza.table.ReadableTable; -import org.apache.samza.table.utils.DefaultTableReadMetrics; +import org.apache.samza.table.BaseReadableTable; import org.apache.samza.table.utils.TableMetricsUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.Collections; @@ -38,9 +36,10 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.function.BiFunction; import java.util.function.Function; +import static org.apache.samza.table.utils.TableMetricsUtil.incCounter; +import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer; /** * A Samza {@link org.apache.samza.table.Table} backed by a remote data-store or service. @@ -71,18 +70,13 @@ import java.util.function.Function; * @param <K> the type of the key in this table * @param <V> the type of the value in this table */ -public class RemoteReadableTable<K, V> implements ReadableTable<K, V> { - - protected final String tableId; - protected final Logger logger; +public class RemoteReadableTable<K, V> extends BaseReadableTable<K, V> { protected final ExecutorService callbackExecutor; protected final ExecutorService tableExecutor; protected final TableReadFunction<K, V> readFn; protected final TableRateLimiter<K, V> readRateLimiter; - private DefaultTableReadMetrics readMetrics; - /** * Construct a RemoteReadableTable instance * @param tableId table id @@ -93,21 +87,22 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> { */ public RemoteReadableTable(String tableId, TableReadFunction<K, V> readFn, TableRateLimiter<K, V> rateLimiter, ExecutorService tableExecutor, ExecutorService callbackExecutor) { - Preconditions.checkArgument(tableId != null && !tableId.isEmpty(), "invalid table id"); + super(tableId); Preconditions.checkNotNull(readFn, "null read function"); - this.tableId = tableId; this.readFn = readFn; this.readRateLimiter = rateLimiter; this.callbackExecutor = callbackExecutor; this.tableExecutor = tableExecutor; - this.logger = LoggerFactory.getLogger(getClass().getName() + "-" + tableId); } @Override public void init(Context context) { - readMetrics = new DefaultTableReadMetrics(context, this, tableId); - TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId); - readRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("get-throttle-ns")); + super.init(context); + MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig()); + if (metricsConfig.getMetricsTimerEnabled()) { + TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId); + readRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("get-throttle-ns")); + } } @Override @@ -122,14 +117,13 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> { @Override public CompletableFuture<V> getAsync(K key) { Preconditions.checkNotNull(key); - readMetrics.numGets.inc(); - return execute(readRateLimiter, key, readFn::getAsync, readMetrics.getNs) + return execute(readRateLimiter, key, readFn::getAsync, readMetrics.numGets, readMetrics.getNs) .handle((result, e) -> { if (e != null) { throw new SamzaException("Failed to get the records for " + key, e); } if (result == null) { - readMetrics.numMissedLookups.inc(); + incCounter(readMetrics.numMissedLookups); } return result; }); @@ -137,7 +131,6 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> { @Override public Map<K, V> getAll(List<K> keys) { - readMetrics.numGetAlls.inc(); try { return getAllAsync(keys).get(); } catch (Exception e) { @@ -151,13 +144,12 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> { if (keys.isEmpty()) { return CompletableFuture.completedFuture(Collections.EMPTY_MAP); } - readMetrics.numGetAlls.inc(); - return execute(readRateLimiter, keys, readFn::getAllAsync, readMetrics.getAllNs) + return execute(readRateLimiter, keys, readFn::getAllAsync, readMetrics.numGetAlls, readMetrics.getAllNs) .handle((result, e) -> { if (e != null) { throw new SamzaException("Failed to get the records for " + keys, e); } - result.values().stream().filter(Objects::isNull).map(v -> readMetrics.numMissedLookups.inc()); + result.values().stream().filter(Objects::isNull).forEach(v -> incCounter(readMetrics.numMissedLookups)); return result; }); } @@ -172,56 +164,15 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> { * @return CompletableFuture of the operation */ protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> rateLimiter, - K key, Function<K, CompletableFuture<T>> method, Timer timer) { + K key, Function<K, CompletableFuture<T>> method, Counter counter, Timer timer) { + incCounter(counter); final long startNs = System.nanoTime(); - CompletableFuture<T> ioFuture = rateLimiter.isRateLimited() ? - CompletableFuture + CompletableFuture<T> ioFuture = rateLimiter.isRateLimited() + ? CompletableFuture .runAsync(() -> rateLimiter.throttle(key), tableExecutor) - .thenCompose((r) -> method.apply(key)) : - method.apply(key); - if (callbackExecutor != null) { - ioFuture.thenApplyAsync(r -> { - timer.update(System.nanoTime() - startNs); - return r; - }, callbackExecutor); - } else { - ioFuture.thenApply(r -> { - timer.update(System.nanoTime() - startNs); - return r; - }); - } - return ioFuture; - } - - /** - * Execute an async request given a table record (key+value) - * @param rateLimiter helper for rate limiting - * @param key key of the table record - * @param value value of the table record - * @param method method to be executed - * @param timer latency metric to be updated - * @return CompletableFuture of the operation - */ - protected CompletableFuture<Void> execute(TableRateLimiter<K, V> rateLimiter, - K key, V value, BiFunction<K, V, CompletableFuture<Void>> method, Timer timer) { - final long startNs = System.nanoTime(); - CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited() ? - CompletableFuture - .runAsync(() -> rateLimiter.throttle(key, value), tableExecutor) - .thenCompose((r) -> method.apply(key, value)) : - method.apply(key, value); - if (callbackExecutor != null) { - ioFuture.thenApplyAsync(r -> { - timer.update(System.nanoTime() - startNs); - return r; - }, callbackExecutor); - } else { - ioFuture.thenApply(r -> { - timer.update(System.nanoTime() - startNs); - return r; - }); - } - return ioFuture; + .thenCompose((r) -> method.apply(key)) + : method.apply(key); + return completeExecution(ioFuture, startNs, timer); } /** @@ -234,54 +185,34 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> { * @return CompletableFuture of the operation */ protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> rateLimiter, - Collection<K> keys, Function<Collection<K>, CompletableFuture<T>> method, Timer timer) { + Collection<K> keys, Function<Collection<K>, CompletableFuture<T>> method, Counter counter, Timer timer) { + incCounter(counter); final long startNs = System.nanoTime(); - CompletableFuture<T> ioFuture = rateLimiter.isRateLimited() ? - CompletableFuture + CompletableFuture<T> ioFuture = rateLimiter.isRateLimited() + ? CompletableFuture .runAsync(() -> rateLimiter.throttle(keys), tableExecutor) - .thenCompose((r) -> method.apply(keys)) : - method.apply(keys); - if (callbackExecutor != null) { - ioFuture.thenApplyAsync(r -> { - timer.update(System.nanoTime() - startNs); - return r; - }, callbackExecutor); - } else { - ioFuture.thenApply(r -> { - timer.update(System.nanoTime() - startNs); - return r; - }); - } - return ioFuture; + .thenCompose((r) -> method.apply(keys)) + : method.apply(keys); + return completeExecution(ioFuture, startNs, timer); } /** - * Execute an async request given a collection of table records - * @param rateLimiter helper for rate limiting - * @param records list of records - * @param method method to be executed + * Complete the pending execution and update timer + * @param ioFuture the future to be executed + * @param startNs start time in nanosecond * @param timer latency metric to be updated + * @param <T> return type * @return CompletableFuture of the operation */ - protected CompletableFuture<Void> executeRecords(TableRateLimiter<K, V> rateLimiter, - Collection<Entry<K, V>> records, Function<Collection<Entry<K, V>>, CompletableFuture<Void>> method, Timer timer) { - final long startNs = System.nanoTime(); - CompletableFuture<Void> ioFuture; - if (rateLimiter.isRateLimited()) { - ioFuture = CompletableFuture - .runAsync(() -> rateLimiter.throttleRecords(records), tableExecutor) - .thenCompose((r) -> method.apply(records)); - } else { - ioFuture = method.apply(records); - } + protected <T> CompletableFuture<T> completeExecution(CompletableFuture<T> ioFuture, long startNs, Timer timer) { if (callbackExecutor != null) { ioFuture.thenApplyAsync(r -> { - timer.update(System.nanoTime() - startNs); + updateTimer(timer, System.nanoTime() - startNs); return r; }, callbackExecutor); } else { ioFuture.thenApply(r -> { - timer.update(System.nanoTime() - startNs); + updateTimer(timer, System.nanoTime() - startNs); return r; }); } http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java deleted file mode 100644 index 525a0cb..0000000 --- a/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.table.utils; - -import org.apache.samza.context.Context; -import org.apache.samza.metrics.Counter; -import org.apache.samza.metrics.Timer; -import org.apache.samza.table.Table; - - -/** - * Utility class that contains the default set of read metrics. - */ -public class DefaultTableReadMetrics { - - public final Timer getNs; - public final Timer getAllNs; - public final Counter numGets; - public final Counter numGetAlls; - public final Timer getCallbackNs; - public final Counter numMissedLookups; - - /** - * Constructor based on container and task container context - * - * @param context {@link Context} for this task - * @param table underlying table - * @param tableId table Id - */ - public DefaultTableReadMetrics(Context context, Table table, String tableId) { - TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId); - getNs = tableMetricsUtil.newTimer("get-ns"); - getAllNs = tableMetricsUtil.newTimer("getAll-ns"); - numGets = tableMetricsUtil.newCounter("num-gets"); - numGetAlls = tableMetricsUtil.newCounter("num-getAlls"); - getCallbackNs = tableMetricsUtil.newTimer("get-callback-ns"); - numMissedLookups = tableMetricsUtil.newCounter("num-missed-lookups"); - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java deleted file mode 100644 index 69d4ef2..0000000 --- a/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.table.utils; - -import org.apache.samza.context.Context; -import org.apache.samza.metrics.Counter; -import org.apache.samza.metrics.Timer; -import org.apache.samza.table.Table; - - -public class DefaultTableWriteMetrics { - - public final Timer putNs; - public final Timer putAllNs; - public final Timer deleteNs; - public final Timer deleteAllNs; - public final Timer flushNs; - public final Counter numPuts; - public final Counter numPutAlls; - public final Counter numDeletes; - public final Counter numDeleteAlls; - public final Counter numFlushes; - public final Timer putCallbackNs; - public final Timer deleteCallbackNs; - - /** - * Utility class that contains the default set of write metrics. - * - * @param context {@link Context} for this task - * @param table underlying table - * @param tableId table Id - */ - public DefaultTableWriteMetrics(Context context, Table table, String tableId) { - TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId); - putNs = tableMetricsUtil.newTimer("put-ns"); - putAllNs = tableMetricsUtil.newTimer("putAll-ns"); - deleteNs = tableMetricsUtil.newTimer("delete-ns"); - deleteAllNs = tableMetricsUtil.newTimer("deleteAll-ns"); - flushNs = tableMetricsUtil.newTimer("flush-ns"); - numPuts = tableMetricsUtil.newCounter("num-puts"); - numPutAlls = tableMetricsUtil.newCounter("num-putAlls"); - numDeletes = tableMetricsUtil.newCounter("num-deletes"); - numDeleteAlls = tableMetricsUtil.newCounter("num-deleteAlls"); - numFlushes = tableMetricsUtil.newCounter("num-flushes"); - putCallbackNs = tableMetricsUtil.newTimer("put-callback-ns"); - deleteCallbackNs = tableMetricsUtil.newTimer("delete-callback-ns"); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java index 1b19272..90d6fe8 100644 --- a/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java +++ b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java @@ -53,7 +53,7 @@ public class TableMetricsUtil { Preconditions.checkNotNull(table); Preconditions.checkNotNull(tableId); - this.metricsRegistry = context.getTaskContext().getTaskMetricsRegistry(); + this.metricsRegistry = context.getContainerContext().getContainerMetricsRegistry(); this.groupName = table.getClass().getSimpleName(); this.tableId = tableId; } @@ -87,6 +87,18 @@ public class TableMetricsUtil { return metricsRegistry.newGauge(groupName, new SupplierGauge(getMetricFullName(name), supplier)); } + public static void incCounter(Counter counter) { + if (counter != null) { + counter.inc(); + } + } + + public static void updateTimer(Timer timer, long duration) { + if (timer != null) { + timer.update(duration); + } + } + private String getMetricFullName(String name) { return String.format("%s-%s", tableId, name); } http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java new file mode 100644 index 0000000..0775844 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.table.utils; + +import org.apache.samza.config.MetricsConfig; +import org.apache.samza.context.Context; +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.Timer; +import org.apache.samza.table.Table; + + +/** + * Utility class that contains the default set of read metrics. + */ +public class TableReadMetrics { + + public final Timer getNs; + public final Timer getAllNs; + public final Counter numGets; + public final Counter numGetAlls; + public final Timer getCallbackNs; + public final Counter numMissedLookups; + + /** + * Constructor based on container and task container context + * + * @param context {@link Context} for this task + * @param table underlying table + * @param tableId table Id + */ + public TableReadMetrics(Context context, Table table, String tableId) { + TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId); + numGets = tableMetricsUtil.newCounter("num-gets"); + numGetAlls = tableMetricsUtil.newCounter("num-getAlls"); + numMissedLookups = tableMetricsUtil.newCounter("num-missed-lookups"); + + MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig()); + if (metricsConfig.getMetricsTimerEnabled()) { + getNs = tableMetricsUtil.newTimer("get-ns"); + getAllNs = tableMetricsUtil.newTimer("getAll-ns"); + getCallbackNs = tableMetricsUtil.newTimer("get-callback-ns"); + } else { + getNs = null; + getAllNs = null; + getCallbackNs = null; + } + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java new file mode 100644 index 0000000..02af35f --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.table.utils; + +import org.apache.samza.config.MetricsConfig; +import org.apache.samza.context.Context; +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.Timer; +import org.apache.samza.table.Table; + + +public class TableWriteMetrics { + + public final Timer putNs; + public final Timer putAllNs; + public final Timer deleteNs; + public final Timer deleteAllNs; + public final Timer flushNs; + public final Counter numPuts; + public final Counter numPutAlls; + public final Counter numDeletes; + public final Counter numDeleteAlls; + public final Counter numFlushes; + public final Timer putCallbackNs; + public final Timer deleteCallbackNs; + + /** + * Utility class that contains the default set of write metrics. + * + * @param context {@link Context} for this task + * @param table underlying table + * @param tableId table Id + */ + public TableWriteMetrics(Context context, Table table, String tableId) { + TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId); + numPuts = tableMetricsUtil.newCounter("num-puts"); + numPutAlls = tableMetricsUtil.newCounter("num-putAlls"); + numDeletes = tableMetricsUtil.newCounter("num-deletes"); + numDeleteAlls = tableMetricsUtil.newCounter("num-deleteAlls"); + numFlushes = tableMetricsUtil.newCounter("num-flushes"); + + MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig()); + if (metricsConfig.getMetricsTimerEnabled()) { + putNs = tableMetricsUtil.newTimer("put-ns"); + putAllNs = tableMetricsUtil.newTimer("putAll-ns"); + deleteNs = tableMetricsUtil.newTimer("delete-ns"); + deleteAllNs = tableMetricsUtil.newTimer("deleteAll-ns"); + flushNs = tableMetricsUtil.newTimer("flush-ns"); + putCallbackNs = tableMetricsUtil.newTimer("put-callback-ns"); + deleteCallbackNs = tableMetricsUtil.newTimer("delete-callback-ns"); + } else { + putNs = null; + putAllNs = null; + deleteNs = null; + deleteAllNs = null; + flushNs = null; + putCallbackNs = null; + deleteCallbackNs = null; + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/test/java/org/apache/samza/config/TestJavaTableConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJavaTableConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJavaTableConfig.java index 2775ca7..bd3ef18 100644 --- a/samza-core/src/test/java/org/apache/samza/config/TestJavaTableConfig.java +++ b/samza-core/src/test/java/org/apache/samza/config/TestJavaTableConfig.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import org.junit.Assert; import org.junit.Test; import com.google.common.collect.Sets; @@ -49,10 +50,30 @@ public class TestJavaTableConfig { @Test public void testGetTableProperties() { Map<String, String> map = new HashMap<>(); - map.put("tables.t1.spec", "t1-spec"); + map.put("stores.t1.key.serde", "key-serde"); + map.put("stores.t1.msg.serde", "msg-serde"); map.put("tables.t1.provider.factory", "t1-provider-factory"); JavaTableConfig tableConfig = new JavaTableConfig(new MapConfig(map)); assertEquals("t1-provider-factory", tableConfig.getTableProviderFactory("t1")); + assertEquals("key-serde", tableConfig.getKeySerde("t1")); + assertEquals("msg-serde", tableConfig.getMsgSerde("t1")); } + @Test + public void testBuildKey() { + String key = JavaTableConfig.buildKey("t1", "abc"); + Assert.assertEquals("tables.t1.abc", key); + } + + @Test + public void testGetForTable() { + Map<String, String> map = new HashMap<>(); + map.put(JavaTableConfig.buildKey("t1", "abc"), "xyz"); + JavaTableConfig tableConfig = new JavaTableConfig(new MapConfig(map)); + Assert.assertEquals("xyz", tableConfig.getForTable("t1", "abc")); + Assert.assertNull(tableConfig.getForTable("t1", "aaa")); + Assert.assertEquals("xyz", tableConfig.getForTable("t1", "aaa", "xyz")); + Assert.assertNull(tableConfig.getForTable("tt", "abc")); + Assert.assertEquals("xyz", tableConfig.getForTable("tt", "abc", "xyz")); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java index bfb329a..daaba46 100644 --- a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java +++ b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java @@ -24,6 +24,7 @@ import com.google.common.cache.CacheBuilder; import org.apache.commons.lang3.tuple.Pair; import org.apache.samza.config.JavaTableConfig; import org.apache.samza.config.MapConfig; +import org.apache.samza.config.MetricsConfig; import org.apache.samza.context.Context; import org.apache.samza.context.MockContext; import org.apache.samza.metrics.Counter; @@ -59,14 +60,13 @@ import java.util.concurrent.Executors; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; + public class TestCachingTable { + + private MetricsRegistry metricsRegistry; + @Test public void testSerializeSimple() { doTestSerialize(null); @@ -139,15 +139,23 @@ public class TestCachingTable { } private void initTables(ReadableTable ... tables) { + initTables(false, tables); + } + + private void initTables(boolean isTimerMetricsDisabled, ReadableTable ... tables) { + Map<String, String> config = new HashMap<>(); + if (isTimerMetricsDisabled) { + config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false"); + } Context context = new MockContext(); - MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); + doReturn(new MapConfig(config)).when(context.getJobContext()).getConfig(); + metricsRegistry = mock(MetricsRegistry.class); doReturn(mock(Timer.class)).when(metricsRegistry).newTimer(anyString(), anyString()); doReturn(mock(Counter.class)).when(metricsRegistry).newCounter(anyString(), anyString()); doReturn(mock(Gauge.class)).when(metricsRegistry).newGauge(anyString(), any()); - when(context.getTaskContext().getTaskMetricsRegistry()).thenReturn(metricsRegistry); - for (ReadableTable table : tables) { - table.init(context); - } + doReturn(metricsRegistry).when(context.getContainerContext()).getContainerMetricsRegistry(); + + Arrays.asList(tables).forEach(t -> t.init(context)); } private void doTestCacheOps(boolean isWriteAround) { @@ -183,7 +191,7 @@ public class TestCachingTable { return null; }).when(context.getTaskContext()).getTable(anyString()); - when(context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new NoOpMetricsRegistry()); + when(context.getContainerContext().getContainerMetricsRegistry()).thenReturn(new NoOpMetricsRegistry()); Map<String, String> tableConfig = desc.toConfig(new MapConfig()); when(context.getJobContext().getConfig()).thenReturn(new MapConfig(tableConfig)); @@ -284,6 +292,20 @@ public class TestCachingTable { initTables(cachingTable, guavaTable, remoteTable); + // 3 per readable table (9) + // 5 per read/write table (15) + verify(metricsRegistry, times(24)).newCounter(any(), anyString()); + + // 3 per readable table (9) + // 7 per read/write table (21) + // 1 per remote readable table (1) + // 1 per remote read/write table (1) + verify(metricsRegistry, times(32)).newTimer(any(), anyString()); + + // 1 per guava table (1) + // 3 per caching table (2) + verify(metricsRegistry, times(4)).newGauge(anyString(), any()); + // GET doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(any()); Assert.assertEquals(cachingTable.getAsync("foo").get(), "bar"); @@ -364,6 +386,50 @@ public class TestCachingTable { Assert.assertNull(guavaCache.getIfPresent("foo3")); } + @Test + public void testTimerDisabled() throws Exception { + String tableId = "testTimerDisabled"; + + Cache<String, String> guavaCache = CacheBuilder.newBuilder().initialCapacity(100).build(); + final ReadWriteTable<String, String> guavaTable = new GuavaCacheTable<>(tableId, guavaCache); + + TableRateLimiter<String, String> rateLimitHelper = mock(TableRateLimiter.class); + + TableReadFunction<String, String> readFn = mock(TableReadFunction.class); + doReturn(CompletableFuture.completedFuture("")).when(readFn).getAsync(any()); + + TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class); + doReturn(CompletableFuture.completedFuture(null)).when(writeFn).putAsync(any(), any()); + doReturn(CompletableFuture.completedFuture(null)).when(writeFn).deleteAsync(any()); + + final RemoteReadWriteTable<String, String> remoteTable = new RemoteReadWriteTable<>( + tableId, readFn, writeFn, rateLimitHelper, rateLimitHelper, + Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor()); + + final CachingTable<String, String> cachingTable = new CachingTable<>( + tableId, remoteTable, guavaTable, false); + + initTables(true, cachingTable, guavaTable, remoteTable); + + cachingTable.get(""); + cachingTable.getAsync("").get(); + cachingTable.getAll(Collections.emptyList()); + cachingTable.getAllAsync(Collections.emptyList()); + cachingTable.flush(); + cachingTable.put("", ""); + cachingTable.putAsync("", ""); + cachingTable.putAll(Collections.emptyList()); + cachingTable.putAllAsync(Collections.emptyList()); + cachingTable.delete(""); + cachingTable.deleteAsync(""); + cachingTable.deleteAll(Collections.emptyList()); + cachingTable.deleteAllAsync(Collections.emptyList()); + + verify(metricsRegistry, atLeast(1)).newCounter(any(), anyString()); + verify(metricsRegistry, atLeast(1)).newGauge(anyString(), any()); + verify(metricsRegistry, times(0)).newTimer(any(), anyString()); + } + private TableDescriptor createDummyTableDescriptor(String tableId) { BaseTableDescriptor tableDescriptor = mock(BaseTableDescriptor.class); when(tableDescriptor.getTableId()).thenReturn(tableId); http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java index d1369d0..d7733a8 100644 --- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java +++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java @@ -62,7 +62,7 @@ public class TestRemoteReadWriteTable { doAnswer(args -> new Timer((String) args.getArguments()[0])).when(metricsRegistry).newTimer(anyString(), anyString()); doAnswer(args -> new Counter((String) args.getArguments()[0])).when(metricsRegistry).newCounter(anyString(), anyString()); doAnswer(args -> new Gauge((String) args.getArguments()[0], 0)).when(metricsRegistry).newGauge(anyString(), any()); - doReturn(metricsRegistry).when(context.getTaskContext()).getTaskMetricsRegistry(); + doReturn(metricsRegistry).when(context.getContainerContext()).getContainerMetricsRegistry(); return context; } http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java index 69e95d4..7a75c90 100644 --- a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java +++ b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java @@ -132,13 +132,16 @@ public class TestRemoteTableDescriptor { private Context createMockContext(TableDescriptor tableDescriptor) { Context context = mock(Context.class); - TaskContextImpl taskContext = mock(TaskContextImpl.class); - when(context.getTaskContext()).thenReturn(taskContext); + ContainerContext containerContext = mock(ContainerContext.class); + when(context.getContainerContext()).thenReturn(containerContext); MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); when(metricsRegistry.newTimer(anyString(), anyString())).thenReturn(mock(Timer.class)); when(metricsRegistry.newCounter(anyString(), anyString())).thenReturn(mock(Counter.class)); - when(taskContext.getTaskMetricsRegistry()).thenReturn(metricsRegistry); + when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry); + + TaskContextImpl taskContext = mock(TaskContextImpl.class); + when(context.getTaskContext()).thenReturn(taskContext); TaskName taskName = new TaskName("MyTask"); TaskModel taskModel = mock(TaskModel.class); @@ -147,10 +150,7 @@ public class TestRemoteTableDescriptor { ContainerModel containerModel = mock(ContainerModel.class); when(containerModel.getTasks()).thenReturn(ImmutableMap.of(taskName, taskModel)); - - ContainerContext containerContext = mock(ContainerContext.class); when(containerContext.getContainerModel()).thenReturn(containerModel); - when(context.getContainerContext()).thenReturn(containerContext); String containerId = "container-1"; JobModel jobModel = mock(JobModel.class); http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java index e0107f9..98d3768 100644 --- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java @@ -20,9 +20,12 @@ package org.apache.samza.storage.kv; import java.util.List; import java.util.concurrent.CompletableFuture; -import org.apache.samza.context.Context; +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.Timer; import org.apache.samza.table.ReadWriteTable; -import org.apache.samza.table.utils.DefaultTableWriteMetrics; + +import static org.apache.samza.table.utils.TableMetricsUtil.incCounter; +import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer; /** @@ -34,8 +37,6 @@ import org.apache.samza.table.utils.DefaultTableWriteMetrics; public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V> implements ReadWriteTable<K, V> { - protected DefaultTableWriteMetrics writeMetrics; - /** * Constructs an instance of {@link LocalReadWriteTable} * @param tableId the table Id @@ -46,18 +47,9 @@ public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V> } @Override - public void init(Context context) { - super.init(context); - writeMetrics = new DefaultTableWriteMetrics(context, this, tableId); - } - - @Override public void put(K key, V value) { if (value != null) { - writeMetrics.numPuts.inc(); - long startNs = System.nanoTime(); - kvStore.put(key, value); - writeMetrics.putNs.update(System.nanoTime() - startNs); + instrument(writeMetrics.numPuts, writeMetrics.putNs, () -> kvStore.put(key, value)); } else { delete(key); } @@ -77,10 +69,7 @@ public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V> @Override public void putAll(List<Entry<K, V>> entries) { - writeMetrics.numPutAlls.inc(); - long startNs = System.nanoTime(); - kvStore.putAll(entries); - writeMetrics.putAllNs.update(System.nanoTime() - startNs); + instrument(writeMetrics.numPutAlls, writeMetrics.putAllNs, () -> kvStore.putAll(entries)); } @Override @@ -97,10 +86,7 @@ public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V> @Override public void delete(K key) { - writeMetrics.numDeletes.inc(); - long startNs = System.nanoTime(); - kvStore.delete(key); - writeMetrics.deleteNs.update(System.nanoTime() - startNs); + instrument(writeMetrics.numDeletes, writeMetrics.deleteNs, () -> kvStore.delete(key)); } @Override @@ -117,10 +103,7 @@ public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V> @Override public void deleteAll(List<K> keys) { - writeMetrics.numDeleteAlls.inc(); - long startNs = System.nanoTime(); - kvStore.deleteAll(keys); - writeMetrics.deleteAllNs.update(System.nanoTime() - startNs); + instrument(writeMetrics.numDeleteAlls, writeMetrics.deleteAllNs, () -> kvStore.deleteAll(keys)); } @Override @@ -137,10 +120,18 @@ public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V> @Override public void flush() { - writeMetrics.numFlushes.inc(); + instrument(writeMetrics.numFlushes, writeMetrics.flushNs, () -> kvStore.flush()); + } + + private interface Func0 { + void apply(); + } + + private void instrument(Counter counter, Timer timer, Func0 func) { + incCounter(counter); long startNs = System.nanoTime(); - kvStore.flush(); - writeMetrics.flushNs.update(System.nanoTime() - startNs); + func.apply(); + updateTimer(timer, System.nanoTime() - startNs); } } http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java index f314918..ba0d3cf 100644 --- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java @@ -19,14 +19,19 @@ package org.apache.samza.storage.kv; import com.google.common.base.Preconditions; + +import com.google.common.base.Supplier; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; -import org.apache.samza.context.Context; -import org.apache.samza.table.ReadableTable; -import org.apache.samza.table.utils.DefaultTableReadMetrics; +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.Timer; +import org.apache.samza.table.BaseReadableTable; + +import static org.apache.samza.table.utils.TableMetricsUtil.incCounter; +import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer; /** * A store backed readable table @@ -34,12 +39,9 @@ import org.apache.samza.table.utils.DefaultTableReadMetrics; * @param <K> the type of the key in this table * @param <V> the type of the value in this table */ -public class LocalReadableTable<K, V> implements ReadableTable<K, V> { +public class LocalReadableTable<K, V> extends BaseReadableTable<K, V> { protected final KeyValueStore<K, V> kvStore; - protected final String tableId; - - protected DefaultTableReadMetrics readMetrics; /** * Constructs an instance of {@link LocalReadableTable} @@ -47,25 +49,16 @@ public class LocalReadableTable<K, V> implements ReadableTable<K, V> { * @param kvStore the backing store */ public LocalReadableTable(String tableId, KeyValueStore<K, V> kvStore) { - Preconditions.checkArgument(tableId != null & !tableId.isEmpty() , "invalid tableId"); + super(tableId); Preconditions.checkNotNull(kvStore, "null KeyValueStore"); - this.tableId = tableId; this.kvStore = kvStore; } @Override - public void init(Context context) { - readMetrics = new DefaultTableReadMetrics(context, this, tableId); - } - - @Override public V get(K key) { - readMetrics.numGets.inc(); - long startNs = System.nanoTime(); - V result = kvStore.get(key); - readMetrics.getNs.update(System.nanoTime() - startNs); + V result = instrument(readMetrics.numGets, readMetrics.getNs, () -> kvStore.get(key)); if (result == null) { - readMetrics.numMissedLookups.inc(); + incCounter(readMetrics.numMissedLookups); } return result; } @@ -83,11 +76,8 @@ public class LocalReadableTable<K, V> implements ReadableTable<K, V> { @Override public Map<K, V> getAll(List<K> keys) { - readMetrics.numGetAlls.inc(); - long startNs = System.nanoTime(); - Map<K, V> result = kvStore.getAll(keys); - readMetrics.getAllNs.update(System.nanoTime() - startNs); - result.values().stream().filter(Objects::isNull).map(v -> readMetrics.numMissedLookups.inc()); + Map<K, V> result = instrument(readMetrics.numGetAlls, readMetrics.getAllNs, () -> kvStore.getAll(keys)); + result.values().stream().filter(Objects::isNull).forEach(v -> incCounter(readMetrics.numMissedLookups)); return result; } @@ -107,4 +97,12 @@ public class LocalReadableTable<K, V> implements ReadableTable<K, V> { // The KV store is not closed here as it may still be needed by downstream operators, // it will be closed by the SamzaContainer } + + private <T> T instrument(Counter counter, Timer timer, Supplier<T> func) { + incCounter(counter); + long startNs = System.nanoTime(); + T result = func.get(); + updateTimer(timer, System.nanoTime() - startNs); + return result; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java new file mode 100644 index 0000000..5531951 --- /dev/null +++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.storage.kv; + +import java.util.Collections; + +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.MetricsConfig; +import org.apache.samza.context.ContainerContext; +import org.apache.samza.context.Context; +import org.apache.samza.context.JobContext; +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.metrics.Timer; +import org.apache.samza.table.ReadWriteTable; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static org.mockito.Mockito.*; + + +public class TestLocalReadWriteTable { + + public static final String TABLE_ID = "t1"; + + private Timer putNs; + private Timer putAllNs; + private Timer deleteNs; + private Timer deleteAllNs; + private Timer flushNs; + private Counter numPuts; + private Counter numPutAlls; + private Counter numDeletes; + private Counter numDeleteAlls; + private Counter numFlushes; + private Timer putCallbackNs; + private Timer deleteCallbackNs; + + private MetricsRegistry metricsRegistry; + + private KeyValueStore kvStore; + + @Before + public void setUp() { + + putNs = new Timer(""); + putAllNs = new Timer(""); + deleteNs = new Timer(""); + deleteAllNs = new Timer(""); + flushNs = new Timer(""); + numPuts = new Counter(""); + numPutAlls = new Counter(""); + numDeletes = new Counter(""); + numDeleteAlls = new Counter(""); + numFlushes = new Counter(""); + putCallbackNs = new Timer(""); + deleteCallbackNs = new Timer(""); + + metricsRegistry = mock(MetricsRegistry.class); + String groupName = LocalReadWriteTable.class.getSimpleName(); + when(metricsRegistry.newTimer(groupName, TABLE_ID + "-put-ns")).thenReturn(putNs); + when(metricsRegistry.newTimer(groupName, TABLE_ID + "-putAll-ns")).thenReturn(putAllNs); + when(metricsRegistry.newTimer(groupName, TABLE_ID + "-delete-ns")).thenReturn(deleteNs); + when(metricsRegistry.newTimer(groupName, TABLE_ID + "-deleteAll-ns")).thenReturn(deleteAllNs); + when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-puts")).thenReturn(numPuts); + when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-putAlls")).thenReturn(numPutAlls); + when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-deletes")).thenReturn(numDeletes); + when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-deleteAlls")).thenReturn(numDeleteAlls); + when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-flushes")).thenReturn(numFlushes); + when(metricsRegistry.newTimer(groupName, TABLE_ID + "-put-callback-ns")).thenReturn(putCallbackNs); + when(metricsRegistry.newTimer(groupName, TABLE_ID + "-delete-callback-ns")).thenReturn(deleteCallbackNs); + when(metricsRegistry.newTimer(groupName, TABLE_ID + "-flush-ns")).thenReturn(flushNs); + + kvStore = mock(KeyValueStore.class); + } + + @Test + public void testPut() throws Exception { + ReadWriteTable table = createTable(false); + table.put("k1", "v1"); + table.putAsync("k2", "v2").get(); + verify(kvStore, times(2)).put(any(), any()); + Assert.assertEquals(2, numPuts.getCount()); + Assert.assertTrue(putNs.getSnapshot().getAverage() > 0); + Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, numPutAlls.getCount()); + Assert.assertEquals(0, numDeletes.getCount()); + Assert.assertEquals(0, numDeleteAlls.getCount()); + Assert.assertEquals(0, numFlushes.getCount()); + Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001); + } + + @Test + public void testPutAll() throws Exception { + ReadWriteTable table = createTable(false); + table.putAll(Collections.emptyList()); + table.putAllAsync(Collections.emptyList()).get(); + verify(kvStore, times(2)).putAll(any()); + Assert.assertEquals(2, numPutAlls.getCount()); + Assert.assertTrue(putAllNs.getSnapshot().getAverage() > 0); + Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, numPuts.getCount()); + Assert.assertEquals(0, numDeletes.getCount()); + Assert.assertEquals(0, numDeleteAlls.getCount()); + Assert.assertEquals(0, numFlushes.getCount()); + Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001); + } + + @Test + public void testDelete() throws Exception { + ReadWriteTable table = createTable(false); + table.delete(""); + table.deleteAsync("").get(); + verify(kvStore, times(2)).delete(any()); + Assert.assertEquals(2, numDeletes.getCount()); + Assert.assertTrue(deleteNs.getSnapshot().getAverage() > 0); + Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, numPuts.getCount()); + Assert.assertEquals(0, numPutAlls.getCount()); + Assert.assertEquals(0, numDeleteAlls.getCount()); + Assert.assertEquals(0, numFlushes.getCount()); + Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001); + } + + @Test + public void testDeleteAll() throws Exception { + ReadWriteTable table = createTable(false); + table.deleteAll(Collections.emptyList()); + table.deleteAllAsync(Collections.emptyList()).get(); + verify(kvStore, times(2)).deleteAll(any()); + Assert.assertEquals(2, numDeleteAlls.getCount()); + Assert.assertTrue(deleteAllNs.getSnapshot().getAverage() > 0); + Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, numPuts.getCount()); + Assert.assertEquals(0, numPutAlls.getCount()); + Assert.assertEquals(0, numDeletes.getCount()); + Assert.assertEquals(0, numFlushes.getCount()); + Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001); + } + + @Test + public void testFlush() { + ReadWriteTable table = createTable(false); + table.flush(); + table.flush(); + verify(kvStore, times(2)).flush(); + Assert.assertEquals(2, numFlushes.getCount()); + Assert.assertTrue(flushNs.getSnapshot().getAverage() > 0); + Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, numPuts.getCount()); + Assert.assertEquals(0, numPutAlls.getCount()); + Assert.assertEquals(0, numDeletes.getCount()); + Assert.assertEquals(0, numDeleteAlls.getCount()); + Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001); + } + + @Test + public void testTimerDisabled() throws Exception { + ReadWriteTable table = createTable(true); + table.put("", ""); + table.putAsync("", "").get(); + table.putAll(Collections.emptyList()); + table.putAllAsync(Collections.emptyList()).get(); + table.delete(""); + table.deleteAsync("").get(); + table.deleteAll(Collections.emptyList()); + table.deleteAllAsync(Collections.emptyList()).get(); + table.flush(); + verify(metricsRegistry, atLeast(1)).newCounter(anyString(), anyString()); + verify(metricsRegistry, times(0)).newTimer(anyString(), anyString()); + verify(metricsRegistry, times(0)).newGauge(anyString(), any()); + Assert.assertEquals(1, numFlushes.getCount()); + Assert.assertEquals(2, numPuts.getCount()); + Assert.assertEquals(2, numPutAlls.getCount()); + Assert.assertEquals(2, numDeletes.getCount()); + Assert.assertEquals(2, numDeleteAlls.getCount()); + Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001); + Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001); + } + + private LocalReadWriteTable createTable(boolean isTimerDisabled) { + Map<String, String> config = new HashMap<>(); + if (isTimerDisabled) { + config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false"); + } + Context context = mock(Context.class); + JobContext jobContext = mock(JobContext.class); + when(context.getJobContext()).thenReturn(jobContext); + when(jobContext.getConfig()).thenReturn(new MapConfig(config)); + ContainerContext containerContext = mock(ContainerContext.class); + when(context.getContainerContext()).thenReturn(containerContext); + when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry); + + LocalReadWriteTable table = new LocalReadWriteTable("t1", kvStore); + table.init(context); + + return table; + } +}
