Github user agresch commented on a diff in the pull request: https://github.com/apache/storm/pull/2504#discussion_r160250982 --- Diff: storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java --- @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore.rocksdb; + +import com.codahale.metrics.Meter; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.ListIterator; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.storm.metricstore.AggLevel; +import org.apache.storm.metricstore.Metric; +import org.apache.storm.metricstore.MetricException; +import org.rocksdb.FlushOptions; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class designed to perform all metrics inserts into RocksDB. Metrics are processed from the a blocking queue. + * </P> + * A writable LRU StringMetadataCache is used to minimize looking up metadata string Ids. As entries are added to the full cache, older + * entries are evicted from the cache and need to be written to the database. This happens as the handleEvictedMetadata() + * method callback. + */ +public class RocksDbMetricsWriter implements Runnable, AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDbMetricsWriter.class); + private RocksDbStore store; + private BlockingQueue queue; + private WritableStringMetadataCache stringMetadataCache; + private Set<Integer> unusedIds = new HashSet<>(); + private TreeMap<RocksDbKey, RocksDbValue> insertBatch = new TreeMap<>(); // RocksDB should insert in sorted key order + private WriteOptions writeOpts = new WriteOptions(); + private volatile boolean shutdown = false; + private Meter failureMeter; + private ArrayList<AggLevel> aggBuckets = new ArrayList<>(); + + /** + * Constructor for the RocksDbMetricsWriter. + * + * @param store The RocksDB store + * @param queue The queue to receive metrics for insertion + */ + RocksDbMetricsWriter(RocksDbStore store, BlockingQueue queue, Meter failureMeter) { + this.store = store; + this.queue = queue; + this.failureMeter = failureMeter; + + aggBuckets.add(AggLevel.AGG_LEVEL_1_MIN); + aggBuckets.add(AggLevel.AGG_LEVEL_10_MIN); + aggBuckets.add(AggLevel.AGG_LEVEL_60_MIN); + } + + /** + * Init routine called once the Metadata cache has been created. + * + * @throws MetricException on cache error + */ + void init() throws MetricException { + this.stringMetadataCache = StringMetadataCache.getWritableStringMetadataCache(); + } + + /** + * Run routine to wait for metrics on a queue and insert into RocksDB. + */ + @Override + public void run() { + while (true) { + if (shutdown) { + return; + } + try { + Metric m = (Metric) queue.take(); + processInsert(m); + } catch (Exception e) { + LOG.error("Failed to insert metric", e); + if (this.failureMeter != null) { + this.failureMeter.mark(); + } + } + } + } + + /** + * Performs the actual metric insert, and aggregates over all bucket times. + * + * @param metric Metric to store + * @throws MetricException if database write fails + */ + private void processInsert(Metric metric) throws MetricException { + + // convert all strings to numeric Ids for the metric key and add to the metadata cache + long metricTimestamp = metric.getTimestamp(); + Integer topologyId = storeMetadataString(KeyType.TOPOLOGY_STRING, metric.getTopologyId(), metricTimestamp); + Integer metricId = storeMetadataString(KeyType.METRIC_STRING, metric.getMetricName(), metricTimestamp); + Integer componentId = storeMetadataString(KeyType.COMPONENT_STRING, metric.getComponentId(), metricTimestamp); + Integer executorId = storeMetadataString(KeyType.EXEC_ID_STRING, metric.getExecutorId(), metricTimestamp); + Integer hostId = storeMetadataString(KeyType.HOST_STRING, metric.getHostname(), metricTimestamp); + Integer streamId = storeMetadataString(KeyType.STREAM_ID_STRING, metric.getStreamId(), metricTimestamp); + + RocksDbKey key = RocksDbKey.createMetricKey(AggLevel.AGG_LEVEL_NONE, topologyId, metric.getTimestamp(), metricId, + componentId, executorId, hostId, metric.getPort(), streamId); + + // save metric key/value to be batched + RocksDbValue value = new RocksDbValue(metric); + insertBatch.put(key, value); + + // Aggregate matching metrics over bucket timeframes. + // We'll process starting with the longest bucket. If the metric for this does not exist, we don't have to + // search for the remaining bucket metrics. + ListIterator li = aggBuckets.listIterator(aggBuckets.size()); + boolean populate = true; + while (li.hasPrevious()) { + AggLevel bucket = (AggLevel)li.previous(); + Metric aggMetric = new Metric(metric); + aggMetric.setAggLevel(bucket); + + long msToBucket = 1000L * 60L * bucket.getValue(); + long roundedToBucket = msToBucket * (metric.getTimestamp() / msToBucket); + aggMetric.setTimestamp(roundedToBucket); + + RocksDbKey aggKey = RocksDbKey.createMetricKey(bucket, topologyId, aggMetric.getTimestamp(), metricId, + componentId, executorId, hostId, aggMetric.getPort(), streamId); + + if (populate) { + // retrieve any existing aggregation matching this one and update the values + if (store.populateFromKey(aggKey, aggMetric)) { + aggMetric.addValue(metric.getValue()); + } else { + // aggregating metric did not exist, don't look for further ones with smaller timestamps + populate = false; + } + } + + // save metric key/value to be batched + RocksDbValue aggVal = new RocksDbValue(aggMetric); + insertBatch.put(aggKey, aggVal); + } + + processBatchInsert(insertBatch); + + insertBatch.clear(); + } + + // converts a metadata string into a unique integer. Updates the timestamp of the string + // so we can track when it was last used for later deletion on database cleanup. + private int storeMetadataString(KeyType type, String s, long metricTimestamp) throws MetricException { + if (s == null) { + throw new MetricException("No string for metric metadata string type " + type); + } + + // attempt to find it in the string cache + StringMetadata stringMetadata = stringMetadataCache.get(s); + if (stringMetadata != null) { + // make sure the timestamp on the metadata has the latest time + stringMetadata.update(metricTimestamp, type); + return stringMetadata.getStringId(); + } + + // attempt to find the string in the database + stringMetadata = store.rocksDbGetStringMetadata(type, s); + if (stringMetadata != null) { + // update to the latest timestamp and add to the string cache + stringMetadata.update(metricTimestamp, type); + stringMetadataCache.put(s, stringMetadata, false); + return stringMetadata.getStringId(); + } + + // string does not exist, create using an unique string id and add to cache + if (LOG.isDebugEnabled()) { + LOG.debug(type + "." + s + " does not exist in cache or database"); + } + int stringId = getUniqueMetadataStringId(); + stringMetadata = new StringMetadata(type, stringId, metricTimestamp); + stringMetadataCache.put(s, stringMetadata, true); + + return stringMetadata.getStringId(); + } + + // get a currently unused unique string id + private int getUniqueMetadataStringId() throws MetricException { + generateUniqueStringIds(); + int id = unusedIds.iterator().next(); + unusedIds.remove(id); + return id; + } + + // guarantees a list of unused string Ids exists. Once the list is empty, creates a new list + // by generating a list of random numbers and removing the ones that already are in use. + private void generateUniqueStringIds() throws MetricException { + int attempts = 0; + while (unusedIds.isEmpty()) { + attempts++; + if (attempts > 100) { + String message = "Failed to generate unique ids"; + LOG.error(message); + throw new MetricException(message); + } + for (int i = 0; i < 600; i++) { + int n = ThreadLocalRandom.current().nextInt(); + if (n == RocksDbStore.INVALID_METADATA_STRING_ID) { + continue; + } + // remove any entries in the cache + if (stringMetadataCache.contains(n)) { + continue; + } + unusedIds.add(n); + } + // now scan all metadata and remove any matching string Ids from this list + RocksDbKey firstPrefix = RocksDbKey.getPrefix(KeyType.METADATA_STRING_START); + RocksDbKey lastPrefix = RocksDbKey.getPrefix(KeyType.METADATA_STRING_END); + store.scanRange(firstPrefix, lastPrefix, (key, value) -> { + unusedIds.remove(key.getMetadataStringId()); + return true; // process all metadata + }); + } + } + + // writes multiple metric values into the database as a batch operation + private void processBatchInsert(Map<RocksDbKey, RocksDbValue> batchMap) throws MetricException { + try (WriteBatch writeBatch = new WriteBatch()) { --- End diff -- The reason the insertBatch map is being used to is force sort the keys. RocksDB indicated sorting would be faster for doing an insert. The comment on line 54 was trying to imply this information. Let me know if I can be clearer.
---