This is an automated email from the ASF dual-hosted git repository.

ajothomas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new a52ef9db2 Add Default Configuration for RocksDB MaxManifestFileSize 
(#1706)
a52ef9db2 is described below

commit a52ef9db29bf9c7d6ff2d149a4a46e7d703e5533
Author: Haolan Ye <[email protected]>
AuthorDate: Mon Aug 26 11:41:16 2024 -0700

    Add Default Configuration for RocksDB MaxManifestFileSize (#1706)
    
    * add default configuration for rocksdb max manifest file size
    
    * update doc, clarify the configuration
    
    * create constant for default store name
    
    * address minor comments
    
    * table format
    
    * one-line
---
 .../documentation/versioned/jobs/samza-configurations.md     |  3 ++-
 .../src/main/java/org/apache/samza/config/StorageConfig.java | 11 +++++++++++
 .../test/java/org/apache/samza/config/TestStorageConfig.java | 11 +++++++++++
 .../org/apache/samza/storage/kv/RocksDbKeyValueReader.java   |  7 ++++++-
 .../org/apache/samza/storage/kv/RocksDbOptionsHelper.java    | 10 ++++------
 .../storage/kv/RocksDbKeyValueStorageEngineFactory.scala     | 12 ++++++++++--
 6 files changed, 44 insertions(+), 10 deletions(-)

diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md 
b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 9ccb4192e..a6cd5ce16 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -322,7 +322,8 @@ These properties define Samza's storage mechanism for 
efficient [stateful stream
 |stores.**_store-name_**.<br>rocksdb.keep.log.file.num|2|The number of RocksDB 
LOG files (including rotated LOG.old.* files) to keep.|
 |stores.**_store-name_**.<br>rocksdb.metrics.list|(none)|A list of [RocksDB 
properties](https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L409)
 to expose as metrics (gauges).|
 
|stores.**_store-name_**.<br>rocksdb.delete.obsolete.files.period.micros|21600000000|This
 property specifies the period in microseconds to delete obsolete files 
regardless of files removed during compaction. Allowed range is up to 
9223372036854775807.|
-|stores.**_store-name_**.<br>rocksdb.max.manifest.file.size|18446744073709551615|This
 property specifies the maximum size of the MANIFEST data file, after which it 
is rotated. Default value is also the maximum, making it practically unlimited: 
only one manifest file is used.|
+|stores-default.<br>rocksdb.max.manifest.file.size.bytes|1073741824| This 
property specifies the default maximum size (in bytes) of the MANIFEST data 
file for **ANY** stores, after which it is rotated. The default value is 1GB. 
The value for a specific store can be configured by 
`stores.store-name.rocksdb.max.manifest.file.size`.|
+|stores.**_store-name_**.<br>rocksdb.max.manifest.file.size|stores-default.<br>rocksdb.max.manifest.file.size.bytes|
 This property specifies the maximum size (in bytes) of the MANIFEST data file 
for a specific store, after which it is rotated. The default value is defined 
by `stores-default.rocksdb.max.manifest.file.size.bytes`.|
 |stores.**_store-name_**.<br>side.inputs|(none)|Samza applications with stores 
that are populated by a secondary data sources such as HDFS, but otherwise 
ready-only, can leverage side inputs. Stores configured with side inputs use 
the the source streams to bootstrap data in the absence of local copy thereby, 
reducing additional copy of the data in changelog. It is also recommended to 
enable host affinity feature when turning on side inputs to prevent 
bootstrapping of the data during cont [...]
 |stores.**_store-name_**.<br>side.inputs.processor.factory|(none)|The value is 
a fully-qualified name of a Java class that implements <a 
href="../api/javadocs/org/apache/samza/storage/SideInputProcessorFactory.html">SideInputProcessorFactory</a>.
 It is a required configuration for stores with side inputs 
(`stores.store-name.side.inputs`).
 
diff --git 
a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
index 0bb993f19..df96ac629 100644
--- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
@@ -79,7 +79,9 @@ public class StorageConfig extends MapConfig {
   public static final String STORE_RESTORE_FACTORIES = STORE_PREFIX + "%s." + 
RESTORE_FACTORIES_SUFFIX;
   public static final String JOB_RESTORE_FACTORIES = STORE_PREFIX + 
RESTORE_FACTORIES_SUFFIX;
   public static final List<String> DEFAULT_RESTORE_FACTORIES = 
ImmutableList.of(KAFKA_STATE_BACKEND_FACTORY);
+  public static final long DEFAULT_ROCKSDB_MAX_MANIFEST_FILE_SIZE_IN_BYTES = 
1024 * 1024 * 1024L;
 
+  static final String DEFAULT_ROCKSDB_MAX_MANIFEST_FILE_SIZE = 
"stores-default.rocksdb.max.manifest.file.size.bytes";
   static final String CHANGELOG_SYSTEM = "job.changelog.system";
   static final String CHANGELOG_DELETE_RETENTION_MS = STORE_PREFIX + 
"%s.changelog.delete.retention.ms";
   static final long DEFAULT_CHANGELOG_DELETE_RETENTION_MS = 
TimeUnit.DAYS.toMillis(1);
@@ -263,6 +265,15 @@ public class StorageConfig extends MapConfig {
     return getLong(minCompactLagConfigName, 
getDefaultChangelogMinCompactionLagMs());
   }
 
+  /**
+   * Helper method to get the default RocksDB max manifest file size in bytes 
for ANY stores, which is default to 1GB.
+   * The default value for ANY stores can be configured by 
"stores-default.rocksdb.max.manifest.file.size.bytes",
+   * and the value for a specific store can be configured by 
"stores.store-name.rocksdb.max.manifest.file.size".
+   */
+  public long getDefaultMaxManifestFileSizeBytes() {
+    return getLong(DEFAULT_ROCKSDB_MAX_MANIFEST_FILE_SIZE, 
DEFAULT_ROCKSDB_MAX_MANIFEST_FILE_SIZE_IN_BYTES);
+  }
+
   /**
    * Helper method to check if there is any stores configured w/ a changelog
    */
diff --git 
a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java 
b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
index 953f6efe6..c9f3f01f6 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
@@ -590,4 +590,15 @@ public class TestStorageConfig {
     assertEquals(Collections.emptyList(),
         new StorageConfig(new 
MapConfig(configMap)).getStoresWithBackupFactory(storeBackupFactory2));
   }
+
+  @Test
+  public void testGetMaxManifestFileSize() {
+    // empty config, return default size, which is 1GB
+    assertEquals(DEFAULT_ROCKSDB_MAX_MANIFEST_FILE_SIZE_IN_BYTES,
+        new StorageConfig(new 
MapConfig()).getDefaultMaxManifestFileSizeBytes());
+
+    StorageConfig storageConfig = new StorageConfig(
+        new 
MapConfig(ImmutableMap.of(String.format(DEFAULT_ROCKSDB_MAX_MANIFEST_FILE_SIZE),
 "1024")));
+    assertEquals(1024, storageConfig.getDefaultMaxManifestFileSizeBytes());
+  }
 }
diff --git 
a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
 
b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
index 987a60631..119b59006 100644
--- 
a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
+++ 
b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
@@ -61,7 +61,12 @@ public class RocksDbKeyValueReader {
     valueSerde = 
getSerdeFromName(storageConfig.getStorageMsgSerde(storeName).orElse(null), 
serializerConfig);
 
     // get db options
-    Options options = RocksDbOptionsHelper.options(config, 1, new 
File(dbPath), StorageEngineFactory.StoreMode.ReadWrite);
+    Options options = RocksDbOptionsHelper.options(config,
+        1,
+        storageConfig.getDefaultMaxManifestFileSizeBytes(),
+        new File(dbPath),
+        StorageEngineFactory.StoreMode.ReadWrite
+    );
 
     // open the db
     RocksDB.loadLibrary();
diff --git 
a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
 
b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
index 869adb857..cc601840e 100644
--- 
a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
+++ 
b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
@@ -86,8 +86,8 @@ public class RocksDbOptionsHelper {
    */
   private static final int DEFAULT_ROCKSDB_MAX_BACKGROUND_JOBS = 4;
 
-
-  public static Options options(Config storeConfig, int numTasksForContainer, 
File storeDir, StorageEngineFactory.StoreMode storeMode) {
+  public static Options options(Config storeConfig, int numTasksForContainer, 
long defaultMaxManifestFileSize,
+      File storeDir, StorageEngineFactory.StoreMode storeMode) {
     Options options = new Options();
 
     if (storeConfig.getBoolean(ROCKSDB_WAL_ENABLED, false)) {
@@ -143,10 +143,8 @@ public class RocksDbOptionsHelper {
     
options.setDeleteObsoleteFilesPeriodMicros(storeConfig.getLong(ROCKSDB_DELETE_OBSOLETE_FILES_PERIOD_MICROS,
 21600000000L));
     options.setMaxOpenFiles(storeConfig.getInt(ROCKSDB_MAX_OPEN_FILES, -1));
     
options.setMaxFileOpeningThreads(storeConfig.getInt(ROCKSDB_MAX_FILE_OPENING_THREADS,
 16));
-    // The default for rocksdb is 18446744073709551615, which is larger than 
java Long.MAX_VALUE. Hence setting it only if it's passed.
-    if (storeConfig.containsKey(ROCKSDB_MAX_MANIFEST_FILE_SIZE)) {
-      
options.setMaxManifestFileSize(storeConfig.getLong(ROCKSDB_MAX_MANIFEST_FILE_SIZE));
-    }
+    // The default for rocksdb is 1GB (1024*1024*1024 bytes)
+    
options.setMaxManifestFileSize(storeConfig.getLong(ROCKSDB_MAX_MANIFEST_FILE_SIZE,
 defaultMaxManifestFileSize));
     // use prepareForBulk load only when i. the store is being requested in 
BulkLoad mode
     // and ii. the storeDirectory does not exist (fresh restore), because bulk 
load does not work seamlessly with
     // existing stores : https://github.com/facebook/rocksdb/issues/2734
diff --git 
a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
 
b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
index afc289ad7..dce443b8c 100644
--- 
a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
+++ 
b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
@@ -42,14 +42,22 @@ class RocksDbKeyValueStorageEngineFactory [K, V] extends 
BaseKeyValueStorageEngi
     registry: MetricsRegistry,
     jobContext: JobContext,
     containerContext: ContainerContext, storeMode: StoreMode): 
KeyValueStore[Array[Byte], Array[Byte]] = {
+    val storageConfig = new StorageConfig(jobContext.getConfig)
     val storageConfigSubset = jobContext.getConfig.subset("stores." + 
storeName + ".", true)
-    val isLoggedStore = new 
StorageConfig(jobContext.getConfig).getChangelogStream(storeName).isPresent
+    val isLoggedStore = storageConfig.getChangelogStream(storeName).isPresent
     val rocksDbMetrics = new KeyValueStoreMetrics(storeName, registry)
     val numTasksForContainer = 
containerContext.getContainerModel.getTasks.keySet().size()
+    val defaultMaxManifestFileSize = 
storageConfig.getDefaultMaxManifestFileSizeBytes
     rocksDbMetrics.newGauge("rocksdb.block-cache-size",
       () => RocksDbOptionsHelper.getBlockCacheSize(storageConfigSubset, 
numTasksForContainer))
 
-    val rocksDbOptions = RocksDbOptionsHelper.options(storageConfigSubset, 
numTasksForContainer, storeDir, storeMode)
+    val rocksDbOptions = RocksDbOptionsHelper.options(
+      storageConfigSubset,
+      numTasksForContainer,
+      defaultMaxManifestFileSize,
+      storeDir,
+      storeMode
+    )
     val rocksDbWriteOptions = new WriteOptions()
 
     if 
(!storageConfigSubset.getBoolean(RocksDbOptionsHelper.ROCKSDB_WAL_ENABLED, 
false)) {

Reply via email to