Repository: samza Updated Branches: refs/heads/master 46263677d -> 014a59c68
SAMZA-1174; Profiling state store performance This is s commit for [SAMZA-1174](https://issues.apache.org/jira/browse/SAMZA-1174). This commit involves gathering a log of operations (read, write, delete, etc.) happening on the state and publishing them into a kafka topic. It is names as "Access Log" behaving similar to changelog, but gathering log information. Author: jmehar2 <jmeh...@illinois.edu> Author: Jayasi Mehar <jayas...@gmail.com> Author: s-noghabi <abdol...@illinois.edu> Reviewers: Prateek Maheshwari <pmahe...@linkedin.com>, Jagadish V <jagad...@apache.org> Closes #132 from s-noghabi/master Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/014a59c6 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/014a59c6 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/014a59c6 Branch: refs/heads/master Commit: 014a59c68b792c8e63639c6f16257c41f0f4e0c6 Parents: 4626367 Author: jmehar2 <jmeh...@illinois.edu> Authored: Thu May 11 12:56:31 2017 -0700 Committer: vjagadish1989 <jvenk...@linkedin.com> Committed: Thu May 11 12:56:31 2017 -0700 ---------------------------------------------------------------------- .../org/apache/samza/config/StorageConfig.scala | 18 +++ .../samza/coordinator/JobModelManager.scala | 24 +++ .../apache/samza/serializers/SerdeManager.scala | 13 +- .../samza/storage/kv/AccessLogMessage.scala | 44 ++++++ .../samza/storage/kv/AccessLoggedStore.scala | 155 +++++++++++++++++++ .../kv/BaseKeyValueStorageEngineFactory.scala | 9 +- 6 files changed, 258 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/014a59c6/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala index 8dbf739..0e3d568 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala @@ -35,6 +35,11 @@ object StorageConfig { val CHANGELOG_SYSTEM = "job.changelog.system" val CHANGELOG_DELETE_RETENTION_MS = "stores.%s.changelog.delete.retention.ms" val DEFAULT_CHANGELOG_DELETE_RETENTION_MS = TimeUnit.DAYS.toMillis(1) + val ACCESSLOG_STREAM_SUFFIX = "access-log" + val ACCESSLOG_SAMPLING_RATIO = "stores.%s.accesslog.sampling.ratio" + val ACCESSLOG_ENABLED = "stores.%s.accesslog.enabled" + val DEFAULT_ACCESSLOG_SAMPLING_RATIO = 50 + implicit def Config2Storage(config: Config) = new StorageConfig(config) } @@ -45,11 +50,24 @@ class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging def getStorageKeySerde(name: String) = getOption(StorageConfig.KEY_SERDE format name) def getStorageMsgSerde(name: String) = getOption(StorageConfig.MSG_SERDE format name) + def getAccessLogEnabled(storeName: String) = { + getBoolean(ACCESSLOG_ENABLED format storeName, false) + } + def getChangelogStream(name: String) = { val javaStorageConfig = new JavaStorageConfig(config) Option(javaStorageConfig.getChangelogStream(name)) } + //Returns the accesslog stream name given a changelog stream name + def getAccessLogStream(changeLogStream: String) = { + changeLogStream + "-" + ACCESSLOG_STREAM_SUFFIX + } + + def getAccessLogSamplingRatio(storeName: String) = { + getInt(ACCESSLOG_SAMPLING_RATIO format storeName, DEFAULT_ACCESSLOG_SAMPLING_RATIO) + } + def getChangeLogDeleteRetentionInMs(storeName: String) = { getLong(CHANGELOG_DELETE_RETENTION_MS format storeName, DEFAULT_CHANGELOG_DELETE_RETENTION_MS) } http://git-wip-us.apache.org/repos/asf/samza/blob/014a59c6/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala index dda0b6b..353e297 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala @@ -48,6 +48,7 @@ import org.apache.samza.system.SystemFactory import org.apache.samza.system.SystemStreamPartition import org.apache.samza.system.SystemStreamPartitionMatcher import org.apache.samza.system.SystemAdmin +import org.apache.samza.system.StreamSpec import org.apache.samza.util.Logging import org.apache.samza.util.Util import org.apache.samza.Partition @@ -138,6 +139,7 @@ object JobModelManager extends Logging { changelogManager.writeChangeLogPartitionMapping(newChangelogPartitionMapping.asJava) createChangeLogStreams(config, jobModel.maxChangeLogStreamPartitions) + createAccessLogStreams(config, jobModel.maxChangeLogStreamPartitions) jobModelManager } @@ -298,6 +300,28 @@ object JobModelManager extends Logging { } } + private def createAccessLogStreams(config: StorageConfig, changeLogPartitions: Int): Unit = { + val changeLogSystemStreams = config + .getStoreNames + .filter(config.getChangelogStream(_).isDefined) + .map(name => (name, config.getChangelogStream(name).get)).toMap + .mapValues(Util.getSystemStreamFromNames(_)) + + for ((storeName, systemStream) <- changeLogSystemStreams) { + val accessLog = config.getAccessLogEnabled(storeName) + if (accessLog) { + val systemAdmin = Util.getObj[SystemFactory](config + .getSystemFactory(systemStream.getSystem) + .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemStream.getSystem)) + ).getAdmin(systemStream.getSystem, config) + + val accessLogSpec = new StreamSpec(config.getAccessLogStream(systemStream.getStream), + config.getAccessLogStream(systemStream.getStream), systemStream.getSystem, changeLogPartitions) + systemAdmin.createStream(accessLogSpec) + } + } + } + private def getSystemNames(config: Config) = config.getSystemNames.toSet } http://git-wip-us.apache.org/repos/asf/samza/blob/014a59c6/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala b/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala index 066d894..4540bce 100644 --- a/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala @@ -24,6 +24,7 @@ import org.apache.samza.config.SerializerConfig import org.apache.samza.system.SystemStream import org.apache.samza.system.OutgoingMessageEnvelope import org.apache.samza.system.IncomingMessageEnvelope +import org.apache.samza.config.StorageConfig class SerdeManager( serdes: Map[String, Serde[Object]] = Map(), @@ -38,7 +39,8 @@ class SerdeManager( .toBytes(obj) def toBytes(envelope: OutgoingMessageEnvelope): OutgoingMessageEnvelope = { - val key = if (changeLogSystemStreams.contains(envelope.getSystemStream)) { + val key = if (changeLogSystemStreams.contains(envelope.getSystemStream) + || envelope.getSystemStream.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX)) { // If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde. envelope.getKey } else if (envelope.getKeySerializerName != null) { @@ -55,7 +57,8 @@ class SerdeManager( envelope.getKey } - val message = if (changeLogSystemStreams.contains(envelope.getSystemStream)) { + val message = if (changeLogSystemStreams.contains(envelope.getSystemStream) + || envelope.getSystemStream.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX)) { // If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde. envelope.getMessage } else if (envelope.getMessageSerializerName != null) { @@ -90,7 +93,8 @@ class SerdeManager( .fromBytes(bytes) def fromBytes(envelope: IncomingMessageEnvelope) = { - val key = if (changeLogSystemStreams.contains(envelope.getSystemStreamPartition.getSystemStream)) { + val key = if (changeLogSystemStreams.contains(envelope.getSystemStreamPartition.getSystemStream) + || envelope.getSystemStreamPartition.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX) ) { // If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde. envelope.getKey } else if (systemStreamKeySerdes.contains(envelope.getSystemStreamPartition)) { @@ -104,7 +108,8 @@ class SerdeManager( envelope.getKey } - val message = if (changeLogSystemStreams.contains(envelope.getSystemStreamPartition.getSystemStream)) { + val message = if (changeLogSystemStreams.contains(envelope.getSystemStreamPartition.getSystemStream) + || envelope.getSystemStreamPartition.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX)) { // If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde. envelope.getMessage } else if (systemStreamMessageSerdes.contains(envelope.getSystemStreamPartition)) { http://git-wip-us.apache.org/repos/asf/samza/blob/014a59c6/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLogMessage.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLogMessage.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLogMessage.scala new file mode 100644 index 0000000..dde5599 --- /dev/null +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLogMessage.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.kv + +import java.io.Serializable +import java.io.ByteArrayOutputStream +import java.io.ObjectOutputStream +import java.util.ArrayList + +class AccessLogMessage(val DBOperation: Int, + val duration: Long, + val keys: ArrayList[Array[Byte]], + val timestamp: Long = System.currentTimeMillis() + ) extends Serializable { + + + def serialize() : Array[Byte] = { + val byteStream = new ByteArrayOutputStream() + val outputStream = new ObjectOutputStream(byteStream) + outputStream.writeObject(this) + outputStream.close + val obj: Array[Byte] = byteStream.toByteArray + byteStream.close + return obj + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/014a59c6/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala new file mode 100644 index 0000000..c21c9a6 --- /dev/null +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.kv + + +import java.util +import org.apache.samza.config.StorageConfig +import org.apache.samza.task.MessageCollector +import org.apache.samza.util.Logging +import org.apache.samza.system.{OutgoingMessageEnvelope, SystemStream, SystemStreamPartition} +import org.apache.samza.serializers._ + +class AccessLoggedStore[K, V]( + val store: KeyValueStore[K, V], + val collector: MessageCollector, + val changelogSystemStreamPartition: SystemStreamPartition, + val storageConfig: StorageConfig, + val storeName: String, + val keySerde: Serde[K]) extends KeyValueStore[K, V] with Logging { + + object DBOperation extends Enumeration { + type DBOperation = Int + val READ = 1 + val WRITE = 2 + val DELETE = 3 + val RANGE = 4 + } + + val streamName = storageConfig.getAccessLogStream(changelogSystemStreamPartition.getSystemStream.getStream) + val systemStream = new SystemStream(changelogSystemStreamPartition.getSystemStream.getSystem, streamName) + val partitionId: Int = changelogSystemStreamPartition.getPartition.getPartitionId + val serializer = new LongSerde() + val samplingRatio = storageConfig.getAccessLogSamplingRatio(storeName) + val rng = scala.util.Random + + def get(key: K): V = { + val list = new util.ArrayList[Array[Byte]] + list.add(toBytesOrNull(key)) + logAccess(DBOperation.READ, list, store.get(key)) + } + + def getAll(keys: util.List[K]): util.Map[K, V] = { + logAccess(DBOperation.READ, serializeKeys(keys), store.getAll(keys)) + } + + def put(key: K, value: V): Unit = { + val list = new util.ArrayList[Array[Byte]] + list.add(toBytesOrNull(key)) + logAccess(DBOperation.WRITE, list, store.put(key, value)) + } + + def putAll(entries: util.List[Entry[K, V]]): Unit = { + logAccess(DBOperation.WRITE, serializeKeysFromEntries(entries), store.putAll(entries)) + } + + def delete(key: K): Unit = { + val list = new util.ArrayList[Array[Byte]] + list.add(toBytesOrNull(key)) + logAccess(DBOperation.DELETE, list, store.delete(key)) + } + + def deleteAll(keys: util.List[K]): Unit = { + logAccess(DBOperation.DELETE, serializeKeys(keys), store.deleteAll(keys)) + } + + def range(from: K, to: K): KeyValueIterator[K, V] = { + val list : util.ArrayList[K] = new util.ArrayList[K]() + list.add(from) + list.add(to) + logAccess(DBOperation.RANGE, serializeKeys(list), store.range(from, to)) + } + + def all(): KeyValueIterator[K, V] = { + store.all() + } + + def close(): Unit = { + trace("Closing accessLogged store.") + + store.close + } + + def flush(): Unit = { + trace("Flushing store.") + + store.flush + trace("Flushed store.") + } + + + def serializeKeys(keys: util.List[K]): util.ArrayList[Array[Byte]] = { + val keysInBytes = new util.ArrayList[Array[Byte]] + val iter = keys.iterator + if (iter != null) + while(iter.hasNext()) { + val entry = iter.next() + keysInBytes.add(toBytesOrNull(entry)) + } + + keysInBytes + } + + def serializeKeysFromEntries(list: util.List[Entry[K, V]]): util.ArrayList[Array[Byte]] = { + val keysInBytes = new util.ArrayList[Array[Byte]] + val iter = list.iterator + if (iter != null) + while(iter.hasNext()) { + val entry = iter.next().getKey + keysInBytes.add(toBytesOrNull(entry)) + } + + keysInBytes + } + + private def logAccess[R](dBOperation: Int, keys: util.ArrayList[Array[Byte]], + block: => R):R = { + val startTimeNs = System.nanoTime() + val result = block + val endTimeNs = System.nanoTime() + if (rng.nextInt() < samplingRatio) { + val duration = endTimeNs - startTimeNs + val timeStamp = System.currentTimeMillis() + val message = new AccessLogMessage(dBOperation, duration, keys) + collector.send(new OutgoingMessageEnvelope(systemStream, partitionId, serializer.toBytes(timeStamp), message.serialize())) + } + + result + } + + def toBytesOrNull(key: K): Array[Byte] = { + if (key == null) { + return null + } + val bytes = keySerde.toBytes(key) + bytes + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/014a59c6/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala index 8ffc817..e3a2970 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala @@ -82,6 +82,7 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V] val storageConfig = containerContext.config.subset("stores." + storeName + ".", true) val storeFactory = storageConfig.get("factory") var storePropertiesBuilder = new StoreProperties.StorePropertiesBuilder() + val accessLog = storageConfig.getBoolean("accesslog.enabled", false) if (storeFactory == null) { throw new SamzaException("Store factory not defined. Cannot proceed with KV store creation!") @@ -129,8 +130,14 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V] serialized } + val maybeAccessLoggedStore = if (accessLog) { + new AccessLoggedStore(maybeCachedStore, collector, changeLogSystemStreamPartition, storageConfig, storeName, keySerde) + } else { + maybeCachedStore + } + // wrap with null value checking - val nullSafeStore = new NullSafeKeyValueStore(maybeCachedStore) + val nullSafeStore = new NullSafeKeyValueStore(maybeAccessLoggedStore) // create the storage engine and return // TODO: Decide if we should use raw bytes when restoring