Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r160245178
  
    --- Diff: 
storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java
 ---
    @@ -0,0 +1,306 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.storm.metricstore.rocksdb;
    +
    +import com.codahale.metrics.Meter;
    +
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ThreadLocalRandom;
    +import org.apache.storm.metricstore.AggLevel;
    +import org.apache.storm.metricstore.Metric;
    +import org.apache.storm.metricstore.MetricException;
    +import org.rocksdb.FlushOptions;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.WriteBatch;
    +import org.rocksdb.WriteOptions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Class designed to perform all metrics inserts into RocksDB.  Metrics 
are processed from the a blocking queue.
    + * </P>
    + * A writable LRU StringMetadataCache is used to minimize looking up 
metadata string Ids.  As entries are added to the full cache, older
    + * entries are evicted from the cache and need to be written to the 
database.  This happens as the handleEvictedMetadata()
    + * method callback.
    + */
    +public class RocksDbMetricsWriter implements Runnable, AutoCloseable {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(RocksDbMetricsWriter.class);
    +    private RocksDbStore store;
    +    private BlockingQueue queue;
    +    private WritableStringMetadataCache stringMetadataCache;
    +    private Set<Integer> unusedIds = new HashSet<>();
    +    private TreeMap<RocksDbKey, RocksDbValue> insertBatch = new 
TreeMap<>(); // RocksDB should insert in sorted key order
    +    private WriteOptions writeOpts = new WriteOptions();
    +    private volatile boolean shutdown = false;
    +    private Meter failureMeter;
    +    private ArrayList<AggLevel> aggBuckets = new ArrayList<>();
    +
    +    /**
    +     * Constructor for the RocksDbMetricsWriter.
    +     *
    +     * @param store   The RocksDB store
    +     * @param queue   The queue to receive metrics for insertion
    +     */
    +    RocksDbMetricsWriter(RocksDbStore store, BlockingQueue queue, Meter 
failureMeter)  {
    +        this.store = store;
    +        this.queue = queue;
    +        this.failureMeter = failureMeter;
    +
    +        aggBuckets.add(AggLevel.AGG_LEVEL_1_MIN);
    +        aggBuckets.add(AggLevel.AGG_LEVEL_10_MIN);
    +        aggBuckets.add(AggLevel.AGG_LEVEL_60_MIN);
    +    }
    +
    +    /**
    +     * Init routine called once the Metadata cache has been created.
    +     *
    +     * @throws MetricException  on cache error
    +     */
    +    void init() throws MetricException {
    +        this.stringMetadataCache = 
StringMetadataCache.getWritableStringMetadataCache();
    +    }
    +
    +    /**
    +     * Run routine to wait for metrics on a queue and insert into RocksDB.
    +     */
    +    @Override
    +    public void run() {
    +        while (true) {
    +            if (shutdown) {
    +                return;
    +            }
    +            try {
    +                Metric m = (Metric) queue.take();
    +                processInsert(m);
    +            } catch (Exception e) {
    +                LOG.error("Failed to insert metric", e);
    +                if (this.failureMeter != null) {
    +                    this.failureMeter.mark();
    +                }
    +            }
    +        }
    +    }
    +
    +    /**
    +     * Performs the actual metric insert, and aggregates over all bucket 
times.
    +     *
    +     * @param metric  Metric to store
    +     * @throws MetricException  if database write fails
    +     */
    +    private void processInsert(Metric metric) throws MetricException {
    +
    +        // convert all strings to numeric Ids for the metric key and add 
to the metadata cache
    +        long metricTimestamp = metric.getTimestamp();
    +        Integer topologyId = storeMetadataString(KeyType.TOPOLOGY_STRING, 
metric.getTopologyId(), metricTimestamp);
    +        Integer metricId = storeMetadataString(KeyType.METRIC_STRING, 
metric.getMetricName(), metricTimestamp);
    +        Integer componentId = 
storeMetadataString(KeyType.COMPONENT_STRING, metric.getComponentId(), 
metricTimestamp);
    +        Integer executorId = storeMetadataString(KeyType.EXEC_ID_STRING, 
metric.getExecutorId(), metricTimestamp);
    +        Integer hostId = storeMetadataString(KeyType.HOST_STRING, 
metric.getHostname(), metricTimestamp);
    +        Integer streamId = storeMetadataString(KeyType.STREAM_ID_STRING, 
metric.getStreamId(), metricTimestamp);
    +
    +        RocksDbKey key = 
RocksDbKey.createMetricKey(AggLevel.AGG_LEVEL_NONE, topologyId, 
metric.getTimestamp(), metricId,
    +                componentId, executorId, hostId, metric.getPort(), 
streamId);
    +
    +        // save metric key/value to be batched
    +        RocksDbValue value = new RocksDbValue(metric);
    +        insertBatch.put(key, value);
    +
    +        // Aggregate matching metrics over bucket timeframes.
    +        // We'll process starting with the longest bucket.  If the metric 
for this does not exist, we don't have to
    +        // search for the remaining bucket metrics.
    +        ListIterator li = aggBuckets.listIterator(aggBuckets.size());
    +        boolean populate = true;
    +        while (li.hasPrevious()) {
    +            AggLevel bucket = (AggLevel)li.previous();
    +            Metric aggMetric = new Metric(metric);
    +            aggMetric.setAggLevel(bucket);
    +
    +            long msToBucket = 1000L * 60L * bucket.getValue();
    +            long roundedToBucket = msToBucket * (metric.getTimestamp() / 
msToBucket);
    +            aggMetric.setTimestamp(roundedToBucket);
    +
    +            RocksDbKey aggKey = RocksDbKey.createMetricKey(bucket, 
topologyId, aggMetric.getTimestamp(), metricId,
    +                    componentId, executorId, hostId, aggMetric.getPort(), 
streamId);
    +
    +            if (populate) {
    +                // retrieve any existing aggregation matching this one and 
update the values
    +                if (store.populateFromKey(aggKey, aggMetric)) {
    +                    aggMetric.addValue(metric.getValue());
    +                } else {
    +                    // aggregating metric did not exist, don't look for 
further ones with smaller timestamps
    +                    populate = false;
    +                }
    +            }
    +
    +            // save metric key/value to be batched
    +            RocksDbValue aggVal = new RocksDbValue(aggMetric);
    +            insertBatch.put(aggKey, aggVal);
    +        }
    +
    +        processBatchInsert(insertBatch);
    +
    +        insertBatch.clear();
    +    }
    +
    +    // converts a metadata string into a unique integer.  Updates the 
timestamp of the string
    +    // so we can track when it was last used for later deletion on 
database cleanup.
    +    private int storeMetadataString(KeyType type, String s, long 
metricTimestamp) throws MetricException {
    +        if (s == null) {
    +            throw new MetricException("No string for metric metadata 
string type " + type);
    +        }
    +
    +        // attempt to find it in the string cache
    +        StringMetadata stringMetadata = stringMetadataCache.get(s);
    +        if (stringMetadata != null) {
    +            // make sure the timestamp on the metadata has the latest time
    +            stringMetadata.update(metricTimestamp, type);
    +            return stringMetadata.getStringId();
    +        }
    +
    +        // attempt to find the string in the database
    +        stringMetadata = store.rocksDbGetStringMetadata(type, s);
    +        if (stringMetadata != null) {
    +            // update to the latest timestamp and add to the string cache
    +            stringMetadata.update(metricTimestamp, type);
    +            stringMetadataCache.put(s, stringMetadata, false);
    +            return stringMetadata.getStringId();
    +        }
    +
    +        // string does not exist, create using an unique string id and add 
to cache
    +        if (LOG.isDebugEnabled()) {
    +            LOG.debug(type + "." + s + " does not exist in cache or 
database");
    +        }
    +        int stringId = getUniqueMetadataStringId();
    +        stringMetadata = new StringMetadata(type, stringId, 
metricTimestamp);
    +        stringMetadataCache.put(s, stringMetadata, true);
    +
    +        return stringMetadata.getStringId();
    +    }
    +
    +    // get a currently unused unique string id
    +    private int getUniqueMetadataStringId() throws MetricException {
    +        generateUniqueStringIds();
    +        int id = unusedIds.iterator().next();
    +        unusedIds.remove(id);
    +        return  id;
    +    }
    +
    +    // guarantees a list of unused string Ids exists.  Once the list is 
empty, creates a new list
    +    // by generating a list of random numbers and removing the ones that 
already are in use.
    +    private void generateUniqueStringIds() throws MetricException {
    +        int attempts = 0;
    +        while (unusedIds.isEmpty()) {
    +            attempts++;
    +            if (attempts > 100) {
    +                String message = "Failed to generate unique ids";
    +                LOG.error(message);
    +                throw new MetricException(message);
    +            }
    +            for (int i = 0; i < 600; i++) {
    +                int n = ThreadLocalRandom.current().nextInt();
    +                if (n == RocksDbStore.INVALID_METADATA_STRING_ID) {
    +                    continue;
    +                }
    +                // remove any entries in the cache
    +                if (stringMetadataCache.contains(n)) {
    +                    continue;
    +                }
    +                unusedIds.add(n);
    +            }
    +            // now scan all metadata and remove any matching string Ids 
from this list
    +            RocksDbKey firstPrefix = 
RocksDbKey.getPrefix(KeyType.METADATA_STRING_START);
    +            RocksDbKey lastPrefix = 
RocksDbKey.getPrefix(KeyType.METADATA_STRING_END);
    +            store.scanRange(firstPrefix, lastPrefix, (key, value) -> {
    +                unusedIds.remove(key.getMetadataStringId());
    +                return true; // process all metadata
    +            });
    +        }
    +    }
    +
    +    // writes multiple metric values into the database as a batch operation
    +    private void processBatchInsert(Map<RocksDbKey, RocksDbValue> 
batchMap) throws MetricException {
    +        try (WriteBatch writeBatch = new WriteBatch()) {
    --- End diff --
    
    Because of the thread safety problems with the map, it might be simpler to 
use the WriteBatch object directly instead of trying to use the map first.


---

Reply via email to