Updated Branches: refs/heads/master 819b9960a -> bb0abb670
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala index a1b547d..d3c1ae8 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala @@ -29,56 +29,74 @@ import grizzled.slf4j.Logging /** * A key/value store decorator that adds a changelog for any changes made to the underlying store */ -class LoggedStore[K,V](val store: KeyValueStore[K, V], - val systemStreamPartition: SystemStreamPartition, - val collector: MessageCollector) extends KeyValueStore[K, V] with Logging { +class LoggedStore[K, V]( + val store: KeyValueStore[K, V], + val systemStreamPartition: SystemStreamPartition, + val collector: MessageCollector, + val metrics: LoggedStoreMetrics = new LoggedStoreMetrics) extends KeyValueStore[K, V] with Logging { val systemStream = systemStreamPartition.getSystemStream val partitionId = systemStreamPartition.getPartition.getPartitionId /* pass through methods */ - def get(key: K) = store.get(key) - def range(from: K, to: K) = store.range(from, to) - def all() = store.all() - + def get(key: K) = { + metrics.gets.inc + store.get(key) + } + + def range(from: K, to: K) = { + metrics.ranges.inc + store.range(from, to) + } + + def all() = { + metrics.alls.inc + store.all() + } + /** * Perform the local update and log it out to the changelog */ def put(key: K, value: V) { + metrics.puts.inc store.put(key, value) collector.send(new OutgoingMessageEnvelope(systemStream, partitionId, key, value)) } - + /** * Perform multiple local updates and log out all changes to the changelog */ - def putAll(entries: java.util.List[Entry[K,V]]) { + def putAll(entries: java.util.List[Entry[K, V]]) { + metrics.puts.inc(entries.size) store.putAll(entries) val iter = entries.iterator - while(iter.hasNext) { + while (iter.hasNext) { val curr = iter.next collector.send(new OutgoingMessageEnvelope(systemStream, partitionId, curr.getKey, curr.getValue)) } } - + /** - * Perform the local delete and log it out to the changelog + * Perform the local delete and log it out to the changelog */ def delete(key: K) { + metrics.deletes.inc store.delete(key) collector.send(new OutgoingMessageEnvelope(systemStream, partitionId, key, null)) } - + def flush { trace("Flushing.") + metrics.flushes.inc + store.flush } - + def close { trace("Closing.") store.close } - + } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala new file mode 100644 index 0000000..ea56e8c --- /dev/null +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStoreMetrics.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.kv + +import org.apache.samza.metrics.MetricsRegistry +import org.apache.samza.metrics.MetricsRegistryMap +import org.apache.samza.metrics.Counter +import org.apache.samza.metrics.MetricsHelper + +class LoggedStoreMetrics( + val storeName: String = "unknown", + val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper { + + val gets = newCounter("gets") + val ranges = newCounter("ranges") + val alls = newCounter("alls") + val puts = newCounter("puts") + val deletes = newCounter("deletes") + val flushes = newCounter("flushes") + + override def getPrefix = storeName + "-" +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala index 75fd414..7b4db82 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala @@ -29,47 +29,65 @@ import grizzled.slf4j.Logging class SerializedKeyValueStore[K, V]( store: KeyValueStore[Array[Byte], Array[Byte]], keySerde: Serde[K], - msgSerde: Serde[V]) extends KeyValueStore[K, V] with Logging { + msgSerde: Serde[V], + metrics: SerializedKeyValueStoreMetrics = new SerializedKeyValueStoreMetrics) extends KeyValueStore[K, V] with Logging { def get(key: K): V = { val keyBytes = keySerde.toBytes(key) val found = store.get(keyBytes) - if (found == null) + metrics.gets.inc + metrics.bytesSerialized.inc(keyBytes.size) + if (found == null) { null.asInstanceOf[V] - else + } else { + metrics.bytesDeserialized.inc(found.size) msgSerde.fromBytes(found).asInstanceOf[V] + } } def put(key: K, value: V) { + metrics.puts.inc val keyBytes = keySerde.toBytes(key) val valBytes = msgSerde.toBytes(value) + metrics.bytesSerialized.inc(keyBytes.size + valBytes.size) store.put(keyBytes, valBytes) } def putAll(entries: java.util.List[Entry[K, V]]) { val list = new java.util.ArrayList[Entry[Array[Byte], Array[Byte]]](entries.size()) val iter = entries.iterator + var bytesSerialized = 0L while (iter.hasNext) { val curr = iter.next val keyBytes = keySerde.toBytes(curr.getKey) val valBytes = msgSerde.toBytes(curr.getValue) + bytesSerialized += keyBytes.size + valBytes.size list.add(new Entry(keyBytes, valBytes)) } store.putAll(list) + metrics.puts.inc(list.size) + metrics.bytesSerialized.inc(bytesSerialized) } def delete(key: K) { - store.delete(keySerde.toBytes(key)) + metrics.deletes.inc + val keyBytes = keySerde.toBytes(key) + metrics.bytesSerialized.inc(keyBytes.size) + store.delete(keyBytes) } def range(from: K, to: K): KeyValueIterator[K, V] = { + metrics.ranges.inc val fromBytes = keySerde.toBytes(from) val toBytes = keySerde.toBytes(to) + metrics.bytesSerialized.inc(fromBytes.size + toBytes.size) new DeserializingIterator(store.range(fromBytes, toBytes)) } - def all(): KeyValueIterator[K, V] = + def all(): KeyValueIterator[K, V] = { + metrics.alls.inc new DeserializingIterator(store.all) + } private class DeserializingIterator(iter: KeyValueIterator[Array[Byte], Array[Byte]]) extends KeyValueIterator[K, V] { def hasNext() = iter.hasNext() @@ -77,13 +95,21 @@ class SerializedKeyValueStore[K, V]( def close() = iter.close() def next(): Entry[K, V] = { val nxt = iter.next() - new Entry(keySerde.fromBytes(nxt.getKey()).asInstanceOf[K], msgSerde.fromBytes(nxt.getValue()).asInstanceOf[V]) + val keyBytes = nxt.getKey + val valBytes = nxt.getValue + metrics.bytesDeserialized.inc(keyBytes.size) + if (valBytes != null) { + metrics.bytesDeserialized.inc(valBytes.size) + } + new Entry(keySerde.fromBytes(keyBytes).asInstanceOf[K], msgSerde.fromBytes(valBytes).asInstanceOf[V]) } } def flush { trace("Flushing.") + metrics.flushes.inc + store.flush } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala new file mode 100644 index 0000000..2ad21c8 --- /dev/null +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.kv + +import org.apache.samza.metrics.MetricsRegistry +import org.apache.samza.metrics.MetricsRegistryMap +import org.apache.samza.metrics.Counter +import org.apache.samza.metrics.MetricsHelper + +class SerializedKeyValueStoreMetrics( + val storeName: String = "unknown", + val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper { + + val gets = newCounter("gets") + val ranges = newCounter("ranges") + val alls = newCounter("alls") + val puts = newCounter("puts") + val deletes = newCounter("deletes") + val flushes = newCounter("flushes") + val bytesSerialized = newCounter("bytes-serialized") + val bytesDeserialized = newCounter("bytes-deserialized") + + override def getPrefix = storeName + "-" +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/bb0abb67/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala index 4f6edfb..983771d 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala @@ -34,10 +34,9 @@ import org.apache.samza.SamzaException import java.util.Timer import java.util.TimerTask import org.apache.samza.task.ReadableCollector +import org.apache.samza.metrics.MetricsHelper object SamzaAppMasterMetrics { - val metricsGroup = "samza.yarn.am" - val sourceName = "ApplicationMaster" } @@ -46,11 +45,13 @@ object SamzaAppMasterMetrics { * registry, we might as well use it. This class takes Samza's application * master state, and converts it to metrics. */ -class SamzaAppMasterMetrics(config: Config, state: SamzaAppMasterState, registry: ReadableMetricsRegistry) extends YarnAppMasterListener with Logging { - import SamzaAppMasterMetrics._ +class SamzaAppMasterMetrics( + val config: Config, + val state: SamzaAppMasterState, + val registry: ReadableMetricsRegistry) extends MetricsHelper with YarnAppMasterListener with Logging { val jvm = new JvmMetrics(registry) - val mEventLoops = registry.newCounter(metricsGroup, "EventLoops") + val mEventLoops = newCounter("event-loops") val reporters = config.getMetricReporterNames.map(reporterName => { val metricsFactoryClassName = config .getMetricsFactoryClass(reporterName) @@ -66,17 +67,18 @@ class SamzaAppMasterMetrics(config: Config, state: SamzaAppMasterState, registry }).toMap override def onInit() { - val mRunningContainers = registry.newGauge(metricsGroup, "RunningContainers", { state.runningTasks.size }) - val mNeededContainers = registry.newGauge(metricsGroup, "NeededContainers", { state.neededContainers }) - val mCompletedContainers = registry.newGauge(metricsGroup, "CompletedContainers", { state.completedTasks }) - val mFailedContainers = registry.newGauge(metricsGroup, "FailedContainers", { state.failedContainers }) - val mReleasedContainers = registry.newGauge(metricsGroup, "ReleasedContainers", { state.releasedContainers }) - val mTasks = registry.newGauge(metricsGroup, "TaskCount", { state.taskCount }) - val mHost = registry.newGauge(metricsGroup, "HttpHost", { state.nodeHost }) - val mTrackingPort = registry.newGauge(metricsGroup, "HttpPort", { state.trackingPort }) - val mRpcPort = registry.newGauge(metricsGroup, "RpcPort", { state.rpcPort }) - val mAppAttemptId = registry.newGauge(metricsGroup, "AppAttemptId", { state.appAttemptId.toString }) + val mRunningContainers = newGauge("running-containers", () => state.runningTasks.size) + val mNeededContainers = newGauge("needed-containers", () => state.neededContainers) + val mCompletedContainers = newGauge("completed-containers", () => state.completedTasks) + val mFailedContainers = newGauge("failed-containers", () => state.failedContainers) + val mReleasedContainers = newGauge("released-containers", () => state.releasedContainers) + val mTasks = newGauge("task-count", () => state.taskCount) + val mHost = newGauge("http-host", () => state.nodeHost) + val mTrackingPort = newGauge("http-port", () => state.trackingPort) + val mRpcPort = newGauge("rpc-port", () => state.rpcPort) + val mAppAttemptId = newGauge("app-attempt-id", () => state.appAttemptId.toString) + jvm.start reporters.values.foreach(_.start) }
