Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r162768064
  
    --- Diff: 
storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
 ---
    @@ -0,0 +1,639 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.storm.metricstore.rocksdb;
    +
    +import com.codahale.metrics.Meter;
    +import java.io.File;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.storm.DaemonConfig;
    +import org.apache.storm.metric.StormMetricsRegistry;
    +import org.apache.storm.metricstore.AggLevel;
    +import org.apache.storm.metricstore.FilterOptions;
    +import org.apache.storm.metricstore.Metric;
    +import org.apache.storm.metricstore.MetricException;
    +import org.apache.storm.metricstore.MetricStore;
    +import org.apache.storm.utils.ObjectReader;
    +import org.rocksdb.BlockBasedTableConfig;
    +import org.rocksdb.IndexType;
    +import org.rocksdb.Options;
    +import org.rocksdb.ReadOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.RocksIterator;
    +import org.rocksdb.WriteBatch;
    +import org.rocksdb.WriteOptions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +
    +public class RocksDbStore implements MetricStore, AutoCloseable {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(RocksDbStore.class);
    +    private static final int MAX_QUEUE_CAPACITY = 4000;
    +    static final int INVALID_METADATA_STRING_ID = 0;
    +    RocksDB db;
    +    private ReadOnlyStringMetadataCache readOnlyStringMetadataCache = null;
    +    private BlockingQueue queue = new 
LinkedBlockingQueue(MAX_QUEUE_CAPACITY);
    +    private RocksDbMetricsWriter metricsWriter = null;
    +    private MetricsCleaner metricsCleaner = null;
    +    private Meter failureMeter = null;
    +
    +    interface RocksDbScanCallback {
    +        boolean cb(RocksDbKey key, RocksDbValue val);  // return false to 
stop scan
    +    }
    +
    +    /**
    +     * Create metric store instance using the configurations provided via 
the config map.
    +     *
    +     * @param config Storm config map
    +     * @throws MetricException on preparation error
    +     */
    +    public void prepare(Map config) throws MetricException {
    +        validateConfig(config);
    +
    +        this.failureMeter = 
StormMetricsRegistry.registerMeter("RocksDB:metric-failures");
    +
    +        RocksDB.loadLibrary();
    +        boolean createIfMissing = 
ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING),
 false);
    +
    +        try (Options options = new 
Options().setCreateIfMissing(createIfMissing)) {
    +            // use the hash index for prefix searches
    +            BlockBasedTableConfig tfc = new BlockBasedTableConfig();
    +            tfc.setIndexType(IndexType.kHashSearch);
    +            options.setTableFormatConfig(tfc);
    +            options.useCappedPrefixExtractor(RocksDbKey.KEY_SIZE);
    +
    +            String path = getRocksDbAbsoluteDir(config);
    +            LOG.info("Opening RocksDB from {}", path);
    +            db = RocksDB.open(options, path);
    +        } catch (RocksDBException e) {
    +            String message = "Error opening RockDB database";
    +            LOG.error(message, e);
    +            throw new MetricException(message, e);
    +        }
    +
    +        // create thread to delete old metrics and metadata
    +        Integer retentionHours = 
Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS).toString());
    +        Integer deletionPeriod = 0;
    +        if 
(config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS)) {
    +            deletionPeriod = 
Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS).toString());
    +        }
    +        metricsCleaner = new MetricsCleaner(this, retentionHours, 
deletionPeriod, failureMeter);
    +
    +        // create thread to process insertion of all metrics
    +        metricsWriter = new RocksDbMetricsWriter(this, this.queue, 
this.failureMeter);
    +
    +        int cacheCapacity = 
Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY).toString());
    +        StringMetadataCache.init(metricsWriter, cacheCapacity);
    +        readOnlyStringMetadataCache = 
StringMetadataCache.getReadOnlyStringMetadataCache();
    +        metricsWriter.init(); // init the writer once the cache is setup
    +
    +        // start threads after metadata cache created
    +        Thread thread = new Thread(metricsCleaner, 
"RocksDbMetricsCleaner");
    +        thread.setDaemon(true);
    +        thread.start();
    +
    +        thread = new Thread(metricsWriter, "RocksDbMetricsWriter");
    +        thread.setDaemon(true);
    +        thread.start();
    +    }
    +
    +    /**
    +     * Implements configuration validation of Metrics Store, validates 
storm configuration for Metrics Store.
    +     *
    +     * @param config Storm config to specify which store type, location of 
store and creation policy
    +     * @throws MetricException if there is a missing required 
configuration or if the store does not exist but
    +     *                         the config specifies not to create the store
    +     */
    +    private void validateConfig(Map config) throws MetricException {
    +        if (!(config.containsKey(DaemonConfig.STORM_ROCKSDB_LOCATION))) {
    +            throw new MetricException("Not a vaild RocksDB configuration - 
Missing store location " + DaemonConfig.STORM_ROCKSDB_LOCATION);
    +        }
    +
    +        if 
(!(config.containsKey(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING))) {
    +            throw new MetricException("Not a vaild RocksDB configuration - 
Does not specify creation policy "
    +                    + DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING);
    +        }
    +
    +        // validate path defined
    +        String storePath = getRocksDbAbsoluteDir(config);
    +
    +        boolean createIfMissing = 
ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING),
 false);
    +        if (!createIfMissing) {
    +            if (!(new File(storePath).exists())) {
    +                throw new MetricException("Configuration specifies not to 
create a store but no store currently exists at " + storePath);
    +            }
    +        }
    +
    +        if 
(!(config.containsKey(DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY)))
 {
    +            throw new MetricException("Not a valid RocksDB configuration - 
Missing metadata string cache size "
    +                    + 
DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY);
    +        }
    +
    +        if 
(!config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS)) {
    +            throw new MetricException("Not a valid RocksDB configuration - 
Missing metric retention "
    +                    + DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS);
    +        }
    +    }
    +
    +    private String getRocksDbAbsoluteDir(Map conf) throws MetricException {
    +        String storePath = 
(String)conf.get(DaemonConfig.STORM_ROCKSDB_LOCATION);
    +        if (storePath == null) {
    +            throw new MetricException("Not a vaild RocksDB configuration - 
Missing store location " + DaemonConfig.STORM_ROCKSDB_LOCATION);
    +        } else {
    +            if (new File(storePath).isAbsolute()) {
    +                return storePath;
    +            } else {
    +                String stormHome = System.getProperty("storm.home");
    --- End diff --
    
    Yeah my bad. We didn't have one. Let's leave it as it is since it's a nit.


---

Reply via email to