This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 23627cc  [SAMZA-2655] Expose RocksDB maxOpenFiles setting as Samza 
parameter (#1500)
23627cc is described below

commit 23627ccd4c35f7c282d20febcfdf95f1df7e7e24
Author: Adam Faris <[email protected]>
AuthorDate: Thu May 13 09:35:09 2021 -0700

    [SAMZA-2655] Expose RocksDB maxOpenFiles setting as Samza parameter (#1500)
    
    Expose RocksDB maxOpenFiles setting as Samza parameter
---
 .../versioned/jobs/configuration-table.html        | 16 +++++++++
 .../samza/storage/kv/RocksDbOptionsHelper.java     |  4 +++
 .../kv/descriptors/RocksDbTableDescriptor.java     | 40 ++++++++++++++++++++++
 .../kv/descriptors/TestRocksDbTableDescriptor.java |  6 +++-
 4 files changed, 65 insertions(+), 1 deletion(-)

diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html 
b/docs/learn/documentation/versioned/jobs/configuration-table.html
index b588e64..18a782f 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -1912,6 +1912,22 @@
                 </tr>
 
                 <tr>
+                    <td class="property" 
id="stores-rocksdb-max-open-files">stores.<span 
class="store">store-name</span>.<br>rocksdb.max.open.files</td>
+                    <td class="default">-1</td>
+                    <td class="description">
+                        Limits the number of open files that RocksDB can have 
open at one time.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" 
id="stores-rocksdb-max-file-opening-threads">stores.<span 
class="store">store-name</span>.<br>rocksdb.max.file.opening.threads</td>
+                    <td class="default">16</td>
+                    <td class="description">
+                        Sets the number of threads used to open RocksDB files.
+                    </td>
+                </tr>
+
+                <tr>
                     <td class="property" 
id="stores-rocksdb-metrics">stores.<span 
class="store">store-name</span>.<br>rocksdb.metrics.list</td>
                     <td class="default"></td>
                     <td class="description">
diff --git 
a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
 
b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
index b7c47e1..5af2ef8 100644
--- 
a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
+++ 
b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
@@ -44,6 +44,8 @@ public class RocksDbOptionsHelper {
   private static final String ROCKSDB_KEEP_LOG_FILE_NUM = 
"rocksdb.keep.log.file.num";
   private static final String ROCKSDB_DELETE_OBSOLETE_FILES_PERIOD_MICROS = 
"rocksdb.delete.obsolete.files.period.micros";
   private static final String ROCKSDB_MAX_MANIFEST_FILE_SIZE = 
"rocksdb.max.manifest.file.size";
+  private static final String ROCKSDB_MAX_OPEN_FILES = 
"rocksdb.max.open.files";
+  private static final String ROCKSDB_MAX_FILE_OPENING_THREADS = 
"rocksdb.max.file.opening.threads";
 
   public static Options options(Config storeConfig, int numTasksForContainer, 
File storeDir, StorageEngineFactory.StoreMode storeMode) {
     Options options = new Options();
@@ -109,6 +111,8 @@ public class RocksDbOptionsHelper {
     
options.setMaxLogFileSize(storeConfig.getLong(ROCKSDB_MAX_LOG_FILE_SIZE_BYTES, 
64 * 1024 * 1024L));
     options.setKeepLogFileNum(storeConfig.getLong(ROCKSDB_KEEP_LOG_FILE_NUM, 
2));
     
options.setDeleteObsoleteFilesPeriodMicros(storeConfig.getLong(ROCKSDB_DELETE_OBSOLETE_FILES_PERIOD_MICROS,
 21600000000L));
+    options.setMaxOpenFiles(storeConfig.getInt(ROCKSDB_MAX_OPEN_FILES, -1));
+    
options.setMaxFileOpeningThreads(storeConfig.getInt(ROCKSDB_MAX_FILE_OPENING_THREADS,
 16));
     // The default for rocksdb is 18446744073709551615, which is larger than 
java Long.MAX_VALUE. Hence setting it only if it's passed.
     if (storeConfig.containsKey(ROCKSDB_MAX_MANIFEST_FILE_SIZE)) {
       
options.setMaxManifestFileSize(storeConfig.getLong(ROCKSDB_MAX_MANIFEST_FILE_SIZE));
diff --git 
a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java
 
b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java
index 0044904..f9a1f24 100644
--- 
a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java
+++ 
b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java
@@ -49,6 +49,9 @@ public class RocksDbTableDescriptor<K, V> extends 
LocalTableDescriptor<K, V, Roc
   static final public String ROCKSDB_NUM_WRITE_BUFFERS = 
"rocksdb.num.write.buffers";
   static final public String ROCKSDB_MAX_LOG_FILE_SIZE_BYTES = 
"rocksdb.max.log.file.size.bytes";
   static final public String ROCKSDB_KEEP_LOG_FILE_NUM = 
"rocksdb.keep.log.file.num";
+  static final public String ROCKSDB_MAX_OPEN_FILES = "rocksdb.max.open.files";
+  static final public String ROCKSDB_MAX_FILE_OPENING_THREADS = 
"rocksdb.max.file.opening.threads";
+
 
   private Integer writeBatchSize;
   private Integer objectCacheSize;
@@ -61,6 +64,8 @@ public class RocksDbTableDescriptor<K, V> extends 
LocalTableDescriptor<K, V, Roc
   private Integer numLogFilesToKeep;
   private String compressionType;
   private String compactionStyle;
+  private Integer maxOpenFiles;
+  private Integer maxFileOpeningThreads;
 
   /**
    * Constructs a table descriptor instance
@@ -273,6 +278,35 @@ public class RocksDbTableDescriptor<K, V> extends 
LocalTableDescriptor<K, V, Roc
     return this;
   }
 
+  /**
+   * Limits the number of open files that can be used by the DB.  You may need 
to increase this if your database
+   * has a large working set. Value -1 means files opened are always kept open.
+   * <p>
+   *  Default value is -1.
+   * <p>
+   * @param maxOpenFiles the number of open files that can be used by the DB.
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withMaxOpenFiles(int maxOpenFiles) {
+    this.maxOpenFiles = maxOpenFiles;
+    return this;
+  }
+
+  /**
+   * Sets the number of threads used to open DB files.
+   * If max_open_files is -1, DB will open all files on DB::Open(). You can 
use this option to increase the number of
+   * threads used to open files.
+   * <p>
+   * Default is 16.
+   * <p>
+   * @param maxFileOpeningThreads The number of threads to use when opening DB 
files.
+   * @return the table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withMaxFileOpeningThreads(int 
maxFileOpeningThreads) {
+    this.maxFileOpeningThreads = maxFileOpeningThreads;
+    return this;
+  }
+
   @Override
   public String getProviderFactoryClassName() {
     return LocalTableProviderFactory.class.getName();
@@ -320,6 +354,12 @@ public class RocksDbTableDescriptor<K, V> extends 
LocalTableDescriptor<K, V, Roc
     if (numLogFilesToKeep != null) {
       addStoreConfig(ROCKSDB_KEEP_LOG_FILE_NUM, numLogFilesToKeep.toString(), 
tableConfig);
     }
+    if (maxOpenFiles != null) {
+      addStoreConfig(ROCKSDB_MAX_OPEN_FILES, maxOpenFiles.toString(), 
tableConfig);
+    }
+    if (maxFileOpeningThreads != null) {
+      addStoreConfig(ROCKSDB_MAX_FILE_OPENING_THREADS, 
maxFileOpeningThreads.toString(), tableConfig);
+    }
 
     return Collections.unmodifiableMap(tableConfig);
   }
diff --git 
a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
 
b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
index c58d123..9fc9938 100644
--- 
a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
+++ 
b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
@@ -73,10 +73,12 @@ public class TestRocksDbTableDescriptor {
         .withTtl(7)
         .withWriteBatchSize(8)
         .withWriteBufferSize(9)
+        .withMaxOpenFiles(10)
+        .withMaxFileOpeningThreads(11)
         .withConfig("abc", "xyz")
         .toConfig(createJobConfig());
 
-    Assert.assertEquals(14, tableConfig.size());
+    Assert.assertEquals(16, tableConfig.size());
     assertEquals("1", RocksDbTableDescriptor.ROCKSDB_BLOCK_SIZE_BYTES, 
tableConfig);
     assertEquals("2", RocksDbTableDescriptor.CONTAINER_CACHE_SIZE_BYTES, 
tableConfig);
     assertEquals("3", RocksDbTableDescriptor.ROCKSDB_MAX_LOG_FILE_SIZE_BYTES, 
tableConfig);
@@ -86,6 +88,8 @@ public class TestRocksDbTableDescriptor {
     assertEquals("7", RocksDbTableDescriptor.ROCKSDB_TTL_MS, tableConfig);
     assertEquals("8", RocksDbTableDescriptor.WRITE_BATCH_SIZE, tableConfig);
     assertEquals("9", 
RocksDbTableDescriptor.CONTAINER_WRITE_BUFFER_SIZE_BYTES, tableConfig);
+    assertEquals("10", RocksDbTableDescriptor.ROCKSDB_MAX_OPEN_FILES, 
tableConfig);
+    assertEquals("11", 
RocksDbTableDescriptor.ROCKSDB_MAX_FILE_OPENING_THREADS, tableConfig);
     assertEquals("snappy", RocksDbTableDescriptor.ROCKSDB_COMPRESSION, 
tableConfig);
     assertEquals("fifo", RocksDbTableDescriptor.ROCKSDB_COMPACTION_STYLE, 
tableConfig);
     
Assert.assertFalse(tableConfig.containsKey(String.format(StorageConfig.CHANGELOG_STREAM,
 TABLE_ID)));

Reply via email to