reswqa commented on code in PR #28402:
URL: https://github.com/apache/flink/pull/28402#discussion_r3432674677


##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/RocksDBArchiveStorage.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.StringUtils;
+
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.CompressionType;
+import org.rocksdb.NativeLibraryLoader;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link ArchiveStorage} implementation backed by RocksDB.
+ *
+ * <p>All archived data is stored as key-value pairs in a single RocksDB 
instance, avoiding the
+ * problem of numerous small files. The key is the request path (e.g. {@code
+ * /jobs/xxx/config.json}), and the value is a JSON string.
+ */
+public class RocksDBArchiveStorage implements ArchiveStorage<String> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBArchiveStorage.class);
+
+    private final RocksDB db;
+
+    private BlockBasedTableConfig tableFormatConfig;
+
+    private Options dbOptions;
+
+    private WriteOptions writeOptions;
+
+    private BloomFilter bloomFilter;
+
+    private final ArrayList<AutoCloseable> handlesToClose;
+
+    /**
+     * Creates a new {@link RocksDBArchiveStorage} instance with default 
RocksDB options.
+     *
+     * @param dbPath the RocksDB database directory path
+     * @throws IOException if the RocksDB database cannot be opened
+     */
+    public RocksDBArchiveStorage(File dbPath) throws IOException {
+        this(dbPath, new Configuration());
+    }
+
+    /**
+     * Creates a new {@link RocksDBArchiveStorage} instance.
+     *
+     * @param dbPath the RocksDB database directory path
+     * @param config the configuration used to read RocksDB related options 
(see {@link
+     *     HistoryServerOptions})
+     * @throws IOException if the RocksDB native library cannot be loaded or 
the database cannot be
+     *     opened
+     */
+    public RocksDBArchiveStorage(File dbPath, ReadableConfig config) throws 
IOException {
+        checkNotNull(dbPath, "dbPath cannot be null");
+        checkNotNull(config, "config cannot be null");
+        this.handlesToClose = new ArrayList<>();
+        String rocksDBNativeLibDir =
+                
config.get(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_ROCKSDB_NATIVE_LIB_DIR);
+
+        try {
+            loadRocksDBLibrary(rocksDBNativeLibDir);
+            loadConfiguration(config);
+            this.db = RocksDB.open(dbOptions, dbPath.getAbsolutePath());
+            handlesToClose.add(db);
+        } catch (Throwable t) {
+            throw new IOException("Failed to initialize 
RocksDBArchiveStorage", t);
+        }
+    }
+
+    @Override
+    public boolean exists(String key) throws IOException {
+        try {
+            return db.get(key.getBytes(UTF_8)) != null;
+        } catch (RocksDBException e) {
+            LOG.warn("Failed to check existence of key: {}", key, e);
+            return false;
+        }
+    }
+
+    @Nullable
+    @Override
+    public String getEntry(String key) throws IOException {
+        try {
+            byte[] value = db.get(key.getBytes(UTF_8));
+            if (value == null) {
+                return null;
+            }
+            return new String(value, UTF_8);
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to get key: " + key, e);
+        }
+    }
+
+    @Override
+    public void putArchiveContent(String key, String value) throws IOException 
{
+        try {
+            db.put(writeOptions, key.getBytes(UTF_8), value.getBytes(UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to put key: " + key, e);
+        }
+    }
+
+    @Override
+    public void delete(String key) throws IOException {
+        try {
+            db.delete(writeOptions, key.getBytes(UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to delete key: " + key, e);
+        }
+    }
+
+    @Override
+    public void deleteByPrefix(String keyPrefix) throws IOException {
+        if (StringUtils.isNullOrWhitespaceOnly(keyPrefix)) {
+            return;
+        }
+
+        try {
+            // Delete all keys that start with the given prefix
+            byte[] startKey = keyPrefix.getBytes(UTF_8);
+            byte[] endKey = keyPrefix.getBytes(UTF_8);
+            // Add 1 to the last byte to get the next lexicographic byte
+            endKey[endKey.length - 1]++;
+            db.deleteRange(writeOptions, startKey, endKey);
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to delete prefix: " + keyPrefix, e);
+        }
+    }
+
+    @Override
+    public List<String> getEntriesByPrefix(String prefix) throws IOException {
+        List<String> result = new ArrayList<>();
+        if (prefix == null || prefix.isEmpty()) {
+            return result;
+        }
+
+        try (RocksIterator iterator = db.newIterator()) {
+            byte[] prefixBytes = prefix.getBytes(UTF_8);
+            iterator.seek(prefixBytes);
+            while (iterator.isValid()) {
+                byte[] keyBytes = iterator.key();
+                byte[] valueBytes = iterator.value();
+                String currentKey = new String(keyBytes, UTF_8);
+                String currentValue = new String(valueBytes, UTF_8);
+
+                if (currentKey.startsWith(prefix)) {
+                    result.add(currentValue);
+                    iterator.next();
+                } else {
+                    break;
+                }
+            }
+        }
+
+        return result;
+    }
+
+    @Override
+    public void close() {
+        handlesToClose.forEach(IOUtils::closeQuietly);

Review Comment:
   I think we should close `db` first. The `db` internally holds references to 
objects from `options` — including the bloom filter. As long as the `db` is 
alive, these resources are still in use by background threads, flushes, and 
compactions. If you free the `options` before closing the `db`, it may accesses 
freed memory(i.e. use-after-free).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to