This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 06fb0ec1f65056fd0ef5b67013ca9ae288486961 Author: Stephan Ewen <se...@apache.org> AuthorDate: Thu Dec 5 19:44:22 2019 +0100 [FLINK-14484][state-backend] Some refactorings for the RocksDB memory controlling code This closes #10449 --- .../generated/rocks_db_configuration.html | 24 +++ .../state/api/runtime/SavepointEnvironment.java | 5 +- .../streaming/state/RocksDBKeyedStateBackend.java | 5 - .../state/RocksDBMemoryConfiguration.java | 198 ++++++++++++++++++++ .../streaming/state/RocksDBOperationUtils.java | 49 +++++ .../contrib/streaming/state/RocksDBOptions.java | 43 +++-- ...redObjects.java => RocksDBSharedResources.java} | 22 ++- .../streaming/state/RocksDBStateBackend.java | 135 +++----------- .../RocksDBStateBackendBoundedMemoryTest.java | 204 --------------------- .../state/RocksDBStateBackendConfigTest.java | 44 ++--- 10 files changed, 364 insertions(+), 365 deletions(-) diff --git a/docs/_includes/generated/rocks_db_configuration.html b/docs/_includes/generated/rocks_db_configuration.html index a367b78..660bf8d 100644 --- a/docs/_includes/generated/rocks_db_configuration.html +++ b/docs/_includes/generated/rocks_db_configuration.html @@ -21,6 +21,30 @@ <td>The local directory (on the TaskManager) where RocksDB puts its files.</td> </tr> <tr> + <td><h5>state.backend.rocksdb.memory.fixed-per-slot</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>MemorySize</td> + <td>The fixed total amount of memory, shared among all RocksDB instances per slot. This option overrides the 'state.backend.rocksdb.memory.managed' option when configured. If neither this option, nor the 'state.backend.rocksdb.memory.managed' optionare set, then each RocksDB column family state has its own memory caches (as controlled by the column family options).</td> + </tr> + <tr> + <td><h5>state.backend.rocksdb.memory.high-prio-pool-ratio</h5></td> + <td style="word-wrap: break-word;">0.1</td> + <td>Double</td> + <td>The fraction of cache memory that is reserved for high-priority data like index, filter, and compression dictionary blocks. This option only has an effect when 'state.backend.rocksdb.memory.managed' or 'state.backend.rocksdb.memory.fixed-per-slot' are configured.</td> + </tr> + <tr> + <td><h5>state.backend.rocksdb.memory.managed</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>If set, the RocksDB state backend will automatically configure itself to use the managed memory budget of the task slot, and divide the memory over write buffers, indexes, block caches, etc. That way, the state backend will not exceed the available memory, but use as much memory as it can.</td> + </tr> + <tr> + <td><h5>state.backend.rocksdb.memory.write-buffer-ratio</h5></td> + <td style="word-wrap: break-word;">0.5</td> + <td>Double</td> + <td>The maximum amount of memory that write buffers may take, as a fraction of the total cache memory. This option only has an effect when 'state.backend.rocksdb.memory.managed' or 'state.backend.rocksdb.memory.fixed-per-slot' are configured.</td> + </tr> + <tr> <td><h5>state.backend.rocksdb.options-factory</h5></td> <td style="word-wrap: break-word;">"org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory"</td> <td>String</td> diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java index 625e95a..a7a1db7 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java @@ -81,6 +81,8 @@ public class SavepointEnvironment implements Environment { private final IOManager ioManager; + private final MemoryManager memoryManager; + private final AccumulatorRegistry accumulatorRegistry; private SavepointEnvironment(RuntimeContext ctx, Configuration configuration, int maxParallelism, int indexOfSubtask, PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskState) { @@ -97,6 +99,7 @@ public class SavepointEnvironment implements Environment { this.registry = new KvStateRegistry().createTaskRegistry(jobID, vertexID); this.taskStateManager = new SavepointTaskStateManager(prioritizedOperatorSubtaskState); this.ioManager = new IOManagerAsync(); + this.memoryManager = MemoryManager.forDefaultPageSize(64 * 1024 * 1024); this.accumulatorRegistry = new AccumulatorRegistry(jobID, attemptID); } @@ -162,7 +165,7 @@ public class SavepointEnvironment implements Environment { @Override public MemoryManager getMemoryManager() { - throw new UnsupportedOperationException(ERROR_MSG); + return memoryManager; } @Override diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 07b1fd8..16c8757 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -403,11 +403,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } @VisibleForTesting - DBOptions getDbOptions() { - return dbOptions; - } - - @VisibleForTesting boolean isDisposed() { return this.disposed; } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryConfiguration.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryConfiguration.java new file mode 100644 index 0000000..f3f9442 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryConfiguration.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * The settings regarding RocksDBs memory usage. + */ +public final class RocksDBMemoryConfiguration implements Serializable { + + private static final long serialVersionUID = 1L; + + /** Flag whether to use the managed memory budget for RocksDB. Null is not set. */ + @Nullable + private Boolean useManagedMemory; + + /**The total memory for all RocksDB instances at this slot. Null is not set. */ + @Nullable + private MemorySize fixedMemoryPerSlot; + + /** The maximum fraction of the shared cache consumed by the write buffers. Null if not set.*/ + @Nullable + private Double writeBufferRatio; + + /** The high priority pool ratio in the shared cache, used for index & filter blocks. Null if not set.*/ + @Nullable + private Double highPriorityPoolRatio; + + // ------------------------------------------------------------------------ + + /** + * Configures RocksDB to use the managed memory of a slot. + * See {@link RocksDBOptions#USE_MANAGED_MEMORY} for details. + */ + public void setUseManagedMemory(boolean useManagedMemory) { + this.useManagedMemory = useManagedMemory; + } + + /** + * Configures RocksDB to use a fixed amount of memory shared between all instances (operators) in a slot. + * See {@link RocksDBOptions#FIX_PER_SLOT_MEMORY_SIZE} for details. + */ + public void setFixedMemoryPerSlot(MemorySize fixedMemoryPerSlot) { + checkArgument(fixedMemoryPerSlot == null || fixedMemoryPerSlot.getBytes() > 0, + "Total memory per slot must be > 0"); + + this.fixedMemoryPerSlot = fixedMemoryPerSlot; + } + + /** + * Configures RocksDB to use a fixed amount of memory shared between all instances (operators) in a slot. + * See {@link #setFixedMemoryPerSlot(MemorySize)} for details. + */ + public void setFixedMemoryPerSlot(String totalMemoryPerSlotStr) { + setFixedMemoryPerSlot(MemorySize.parse(totalMemoryPerSlotStr)); + } + + /** + * Sets the fraction of the total memory to be used for write buffers. + * This only has an effect is either {@link #setUseManagedMemory(boolean)} or + * {@link #setFixedMemoryPerSlot(MemorySize)} are set. + * + * <p>See {@link RocksDBOptions#WRITE_BUFFER_RATIO} for details. + */ + public void setWriteBufferRatio(double writeBufferRatio) { + Preconditions.checkArgument(writeBufferRatio > 0 && writeBufferRatio < 1.0, + "Write Buffer ratio %s must be in (0, 1)", writeBufferRatio); + this.writeBufferRatio = writeBufferRatio; + } + + /** + * Sets the fraction of the total memory to be used for high priority blocks like indexes, dictionaries, etc. + * This only has an effect is either {@link #setUseManagedMemory(boolean)} or + * {@link #setFixedMemoryPerSlot(MemorySize)} are set. + * + * <p>See {@link RocksDBOptions#HIGH_PRIORITY_POOL_RATIO} for details. + */ + public void setHighPriorityPoolRatio(double highPriorityPoolRatio) { + Preconditions.checkArgument(highPriorityPoolRatio > 0 && highPriorityPoolRatio < 1.0, + "High priority pool ratio %s must be in (0, 1)", highPriorityPoolRatio); + this.highPriorityPoolRatio = highPriorityPoolRatio; + } + + /** + * Gets whether the state backend is configured to use the managed memory of a slot for RocksDB. + * See {@link RocksDBOptions#USE_MANAGED_MEMORY} for details. + */ + public boolean isUsingManagedMemory() { + return useManagedMemory != null ? useManagedMemory : RocksDBOptions.USE_MANAGED_MEMORY.defaultValue(); + } + + /** + * Gets whether the state backend is configured to use a fixed amount of memory shared between all + * RocksDB instances (in all tasks and operators) of a slot. + * See {@link RocksDBOptions#USE_MANAGED_MEMORY} for details. + */ + public boolean isUsingFixedMemoryPerSlot() { + return fixedMemoryPerSlot != null; + } + + /** + * Gets the fixed amount of memory to be shared between all RocksDB instances (in all tasks and + * operators) of a slot. Null is not configured. + * See {@link RocksDBOptions#USE_MANAGED_MEMORY} for details. + */ + @Nullable + public MemorySize getFixedMemoryPerSlot() { + return fixedMemoryPerSlot; + } + + /** + * Gets the fraction of the total memory to be used for write buffers. + * This only has an effect is either {@link #setUseManagedMemory(boolean)} or + * {@link #setFixedMemoryPerSlot(MemorySize)} are set. + * + * <p>See {@link RocksDBOptions#WRITE_BUFFER_RATIO} for details. + */ + public double getWriteBufferRatio() { + return writeBufferRatio != null ? writeBufferRatio : RocksDBOptions.WRITE_BUFFER_RATIO.defaultValue(); + } + + /** + * Gets the fraction of the total memory to be used for high priority blocks like indexes, dictionaries, etc. + * This only has an effect is either {@link #setUseManagedMemory(boolean)} or + * {@link #setFixedMemoryPerSlot(MemorySize)} are set. + * + * <p>See {@link RocksDBOptions#HIGH_PRIORITY_POOL_RATIO} for details. + */ + public double getHighPriorityPoolRatio() { + return highPriorityPoolRatio != null ? highPriorityPoolRatio : RocksDBOptions.HIGH_PRIORITY_POOL_RATIO.defaultValue(); + } + + // ------------------------------------------------------------------------ + + /** + * Validates if the configured options are valid with respect to one another. + */ + public void validate() { + if (writeBufferRatio != null && highPriorityPoolRatio != null && writeBufferRatio + highPriorityPoolRatio > 1.0) { + throw new IllegalArgumentException(String.format( + "Invalid configuration: Sum of writeBufferRatio %s and highPriPoolRatio %s should be less than 1.0", + writeBufferRatio, highPriorityPoolRatio)); + } + } + + // ------------------------------------------------------------------------ + + /** + * Derives a RocksDBMemoryConfiguration from another object and a configuration. + * The values set on the other object take precedence, and the values from the configuration are + * used if no values are set on the other config object. + */ + public static RocksDBMemoryConfiguration fromOtherAndConfiguration( + RocksDBMemoryConfiguration other, + Configuration config) { + + final RocksDBMemoryConfiguration newConfig = new RocksDBMemoryConfiguration(); + + newConfig.fixedMemoryPerSlot = other.fixedMemoryPerSlot != null + ? other.fixedMemoryPerSlot + : config.get(RocksDBOptions.FIX_PER_SLOT_MEMORY_SIZE); + + newConfig.writeBufferRatio = other.writeBufferRatio != null + ? other.writeBufferRatio + : config.getDouble(RocksDBOptions.WRITE_BUFFER_RATIO); + + newConfig.highPriorityPoolRatio = other.highPriorityPoolRatio != null + ? other.highPriorityPoolRatio + : config.getDouble(RocksDBOptions.HIGH_PRIORITY_POOL_RATIO); + + return newConfig; + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java index 1e3fb8f..1f570ff 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java @@ -19,17 +19,24 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.memory.OpaqueMemoryResource; import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.LongFunctionWithException; +import org.rocksdb.Cache; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; +import org.rocksdb.LRUCache; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBufferManager; +import org.slf4j.Logger; import javax.annotation.Nullable; @@ -46,6 +53,11 @@ import static org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend. * Utils for RocksDB Operations. */ public class RocksDBOperationUtils { + + private static final String MANAGED_MEMORY_RESOURCE_ID = "state-rocks-managed-memory"; + + private static final String FIXED_SLOT_MEMORY_RESOURCE_ID = "state-rocks-fixed-slot-memory"; + public static RocksDB openDB( String path, List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors, @@ -164,4 +176,41 @@ public class RocksDBOperationUtils { // ignore } } + + @Nullable + public static OpaqueMemoryResource<RocksDBSharedResources> allocateSharedCachesIfConfigured( + RocksDBMemoryConfiguration memoryConfig, + MemoryManager memoryManager, + Logger logger) throws IOException { + + if (!memoryConfig.isUsingFixedMemoryPerSlot() && !memoryConfig.isUsingManagedMemory()) { + return null; + } + + final double highPriorityPoolRatio = memoryConfig.getHighPriorityPoolRatio(); + final double writeBufferRatio = memoryConfig.getWriteBufferRatio(); + + final LongFunctionWithException<RocksDBSharedResources, Exception> allocator = (size) -> { + final Cache cache = new LRUCache(size, -1, false, highPriorityPoolRatio); + final WriteBufferManager wbm = new WriteBufferManager((long) (writeBufferRatio * size), cache); + return new RocksDBSharedResources(cache, wbm); + }; + + try { + if (memoryConfig.isUsingFixedMemoryPerSlot()) { + assert memoryConfig.getFixedMemoryPerSlot() != null; + + logger.info("Getting fixed-size shared cache for RocksDB."); + return memoryManager.getExternalSharedMemoryResource( + FIXED_SLOT_MEMORY_RESOURCE_ID, allocator, memoryConfig.getFixedMemoryPerSlot().getBytes()); + } + else { + logger.info("Getting managed memory shared cache for RocksDB."); + return memoryManager.getSharedMemoryResourceForManagedMemory(MANAGED_MEMORY_RESOURCE_ID, allocator); + } + } + catch (Exception e) { + throw new IOException("Failed to acquire shared cache resource for RocksDB", e); + } + } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java index 928882f..5f438bc 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java @@ -20,6 +20,7 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.MemorySize; import static org.apache.flink.contrib.streaming.state.PredefinedOptions.DEFAULT; import static org.apache.flink.contrib.streaming.state.PredefinedOptions.FLASH_SSD_OPTIMIZED; @@ -86,24 +87,42 @@ public class RocksDBOptions { "The default options factory is %s, and it would read the configured options which provided in 'RocksDBConfigurableOptions'.", DefaultConfigurableOptionsFactory.class.getName())); - public static final ConfigOption<String> BOUNDED_MEMORY_SIZE = ConfigOptions - .key("state.backend.rocksdb.per-slot.total.memory") - .stringType() + public static final ConfigOption<Boolean> USE_MANAGED_MEMORY = ConfigOptions + .key("state.backend.rocksdb.memory.managed") + .booleanType() + .defaultValue(false) + .withDescription("If set, the RocksDB state backend will automatically configure itself to use the " + + "managed memory budget of the task slot, and divide the memory over write buffers, indexes, " + + "block caches, etc. That way, the state backend will not exceed the available memory, but use as much " + + "memory as it can."); + + public static final ConfigOption<MemorySize> FIX_PER_SLOT_MEMORY_SIZE = ConfigOptions + .key("state.backend.rocksdb.memory.fixed-per-slot") + .memoryType() .noDefaultValue() - .withDescription("The total memory size shared among all RocksDB instances per slot. " + - "This option has no default value which means no bounded memory limit."); + .withDescription(String.format( + "The fixed total amount of memory, shared among all RocksDB instances per slot. " + + "This option overrides the '%s' option when configured. If neither this option, nor the '%s' option" + + "are set, then each RocksDB column family state has its own memory caches (as controlled by the column " + + "family options).", USE_MANAGED_MEMORY.key(), USE_MANAGED_MEMORY.key())); public static final ConfigOption<Double> WRITE_BUFFER_RATIO = ConfigOptions - .key("state.backend.rocksdb.write-buffer.ratio") + .key("state.backend.rocksdb.memory.write-buffer-ratio") .doubleType() .defaultValue(0.5) - .withDescription(String.format("This option would only take effect when %s is configured, " + - "all RocksDB instances would share the same write buffer manager with the ratio of a LRUCache.", BOUNDED_MEMORY_SIZE)); + .withDescription(String.format( + "The maximum amount of memory that write buffers may take, as a fraction of the total cache memory. " + + "This option only has an effect when '%s' or '%s' are configured.", + USE_MANAGED_MEMORY.key(), + FIX_PER_SLOT_MEMORY_SIZE.key())); - public static final ConfigOption<Double> HIGH_PRI_POOL_RATIO = ConfigOptions - .key("state.backend.rocksdb.high-pri-pool.ratio") + public static final ConfigOption<Double> HIGH_PRIORITY_POOL_RATIO = ConfigOptions + .key("state.backend.rocksdb.memory.high-prio-pool-ratio") .doubleType() .defaultValue(0.1) - .withDescription(String.format("This option would only take effect when %s is configured, " + - "all RocksDB instances would share the high priority ratio for index, filter, and compression dictionary blocks in a LRUCache.", BOUNDED_MEMORY_SIZE)); + .withDescription(String.format( + "The fraction of cache memory that is reserved for high-priority data like index, filter, and " + + "compression dictionary blocks. This option only has an effect when '%s' or '%s' are configured.", + USE_MANAGED_MEMORY.key(), + FIX_PER_SLOT_MEMORY_SIZE.key())); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedObjects.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResources.java similarity index 78% rename from flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedObjects.java rename to flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResources.java index 188c506..dbd46fa 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedObjects.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResources.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -22,24 +22,20 @@ import org.rocksdb.Cache; import org.rocksdb.WriteBufferManager; /** - * Shared objects among RocksDB instances per slot. + * The set of resources that can be shared by all RocksDB instances in a slot. + * Sharing these resources helps RocksDB a predictable resource footprint. */ -public class RocksDBSharedObjects implements AutoCloseable { +final class RocksDBSharedResources implements AutoCloseable { private final Cache cache; + private final WriteBufferManager writeBufferManager; - RocksDBSharedObjects(Cache cache, WriteBufferManager writeBufferManager) { + RocksDBSharedResources(Cache cache, WriteBufferManager writeBufferManager) { this.cache = cache; this.writeBufferManager = writeBufferManager; } - @Override - public void close() { - writeBufferManager.close(); - cache.close(); - } - public Cache getCache() { return cache; } @@ -47,4 +43,10 @@ public class RocksDBSharedObjects implements AutoCloseable { public WriteBufferManager getWriteBufferManager() { return writeBufferManager; } + + @Override + public void close() { + writeBufferManager.close(); + cache.close(); + } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 90e8583..2dcf5c2 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -24,12 +24,11 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.configuration.MemorySize; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.Path; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.memory.OpaqueMemoryResource; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; @@ -56,11 +55,9 @@ import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.Cache; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; -import org.rocksdb.LRUCache; import org.rocksdb.NativeLibraryLoader; import org.rocksdb.RocksDB; import org.rocksdb.TableFormatConfig; -import org.rocksdb.WriteBufferManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,14 +75,10 @@ import java.util.List; import java.util.Random; import java.util.UUID; import java.util.function.Function; -import java.util.function.Supplier; -import static org.apache.flink.contrib.streaming.state.RocksDBOptions.BOUNDED_MEMORY_SIZE; import static org.apache.flink.contrib.streaming.state.RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM; -import static org.apache.flink.contrib.streaming.state.RocksDBOptions.HIGH_PRI_POOL_RATIO; import static org.apache.flink.contrib.streaming.state.RocksDBOptions.TIMER_SERVICE_FACTORY; import static org.apache.flink.contrib.streaming.state.RocksDBOptions.TTL_COMPACT_FILTER_ENABLED; -import static org.apache.flink.contrib.streaming.state.RocksDBOptions.WRITE_BUFFER_RATIO; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -123,8 +116,6 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu private static final int UNDEFINED_NUMBER_OF_TRANSFER_THREADS = -1; - static final long UNDEFINED_VALUE = -1; - // ------------------------------------------------------------------------ // -- configuration values, set in the application / configuration @@ -160,17 +151,8 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu */ private TernaryBoolean enableTtlCompactionFilter; - /** - * Total memory for all rocksDB instances at this slot. - * Currently, we would use a LRU cache to serve as the total memory usage. - */ - private long totalMemoryPerSlot; - - /** The write buffer ratio which consumed by write-buffer manager from the shared cache. */ - private double writeBufferRatio; - - /** The high priority pool ratio which consumed by index&filter from the shared cache. */ - private double highPriPoolRatio; + /** The configuration for memory settings (pool sizes, etc.). */ + private final RocksDBMemoryConfiguration memoryConfiguration; /** This determines the type of priority queue state. */ private final PriorityQueueStateType priorityQueueStateType; @@ -293,9 +275,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu this.priorityQueueStateType = PriorityQueueStateType.HEAP; this.defaultMetricOptions = new RocksDBNativeMetricOptions(); this.enableTtlCompactionFilter = TernaryBoolean.UNDEFINED; - this.totalMemoryPerSlot = UNDEFINED_VALUE; - this.writeBufferRatio = UNDEFINED_VALUE; - this.highPriPoolRatio = UNDEFINED_VALUE; + this.memoryConfiguration = new RocksDBMemoryConfiguration(); } /** @@ -341,32 +321,8 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu this.enableTtlCompactionFilter = original.enableTtlCompactionFilter .resolveUndefined(config.getBoolean(TTL_COMPACT_FILTER_ENABLED)); - configureBoundedMemory( - () -> (double) original.getTotalMemoryPerSlot(), - () -> { - if (config.getString(BOUNDED_MEMORY_SIZE) != null) { - setTotalMemoryPerSlot(config.getString(BOUNDED_MEMORY_SIZE)); - } else { - // we still left total memory size per slot as undefined if no actual settings. - this.totalMemoryPerSlot = UNDEFINED_VALUE; - } - }, - () -> setTotalMemoryPerSlot(String.valueOf(original.totalMemoryPerSlot))); - - configureBoundedMemory( - original::getWriteBufferRatio, - () -> setWriteBufferRatio(config.getDouble(WRITE_BUFFER_RATIO)), - () -> setWriteBufferRatio(original.writeBufferRatio)); - - configureBoundedMemory( - original::getHighPriPoolRatio, - () -> setHighPriPoolRatio(config.getDouble(HIGH_PRI_POOL_RATIO)), - () -> setHighPriPoolRatio(original.highPriPoolRatio)); - - if (isBoundedMemoryEnabled()) { - Preconditions.checkArgument(this.writeBufferRatio + this.highPriPoolRatio < 1.0, - String.format("Illegal sum of writeBufferRatio %s and highPriPoolRatio %s, should be less than 1.0", this.writeBufferRatio, this.highPriPoolRatio)); - } + this.memoryConfiguration = RocksDBMemoryConfiguration.fromOtherAndConfiguration(original.memoryConfiguration, config); + this.memoryConfiguration.validate(); final String priorityQueueTypeString = config.getString(TIMER_SERVICE_FACTORY); @@ -412,14 +368,6 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu } } - private void configureBoundedMemory(Supplier<Double> getOriginalValue, Runnable parseFromConfig, Runnable setFromOriginal) { - if (getOriginalValue.get() == UNDEFINED_VALUE) { - parseFromConfig.run(); - } else { - setFromOriginal.run(); - } - } - // ------------------------------------------------------------------------ // Reconfiguration // ------------------------------------------------------------------------ @@ -558,24 +506,16 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu DBOptions dbOptions = getDbOptions(); Function<String, ColumnFamilyOptions> createColumnOptions; - if (isBoundedMemoryEnabled()) { - RocksDBSharedObjects rocksDBSharedObjects; - MemoryManager memoryManager = env.getMemoryManager(); - // only initialized LRUCache and write buffer manager once. - // we would not dispose the cache and write buffer manager during disposing state backend - // as the same objects are shared by every RocksDB instance per slot. - while ((rocksDBSharedObjects = (RocksDBSharedObjects) memoryManager.getStateBackendSharedObject()) == null) { - if (memoryManager.getLazyInitializeSharedObjectHelper().compareAndSet(false, true)) { - Cache lruCache = new LRUCache(getTotalMemoryPerSlot(), -1, false, getHighPriPoolRatio()); - WriteBufferManager writeBufferManager = new WriteBufferManager((long) (getTotalMemoryPerSlot() * getWriteBufferRatio()), lruCache); - - rocksDBSharedObjects = new RocksDBSharedObjects(lruCache, writeBufferManager); - memoryManager.setStateBackendSharedObject(rocksDBSharedObjects); - } - } + final OpaqueMemoryResource<RocksDBSharedResources> sharedResources = RocksDBOperationUtils + .allocateSharedCachesIfConfigured(memoryConfiguration, env.getMemoryManager(), LOG); - Cache blockCache = checkNotNull(rocksDBSharedObjects.getCache()); - dbOptions.setWriteBufferManager(checkNotNull(rocksDBSharedObjects.getWriteBufferManager())); + if (sharedResources != null) { + LOG.info("Obtained shared RocksDB cache of size {} bytes", sharedResources.getSize()); + + final RocksDBSharedResources rocksResources = sharedResources.getResourceHandle(); + final Cache blockCache = rocksResources.getCache(); + + dbOptions.setWriteBufferManager(rocksResources.getWriteBufferManager()); createColumnOptions = stateName -> { ColumnFamilyOptions columnOptions = getColumnOptions(); @@ -614,7 +554,8 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu stateHandles, keyGroupCompressionDecorator, cancelStreamRegistry - ).setEnableIncrementalCheckpointing(isIncrementalCheckpointsEnabled()) + ) + .setEnableIncrementalCheckpointing(isIncrementalCheckpointsEnabled()) .setEnableTtlCompactionFilter(isTtlCompactionFilterEnabled()) .setNumberOfTransferingThreads(getNumberOfTransferThreads()) .setNativeMetricOptions(getMemoryWatcherOptions()); @@ -690,6 +631,13 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu // ------------------------------------------------------------------------ /** + * Gets the memory configuration object, which offers settings to control RocksDB's memory usage. + */ + public RocksDBMemoryConfiguration getMemoryConfiguration() { + return memoryConfiguration; + } + + /** * Sets the path where the RocksDB local database files should be stored on the local * file system. Setting this path overrides the default behavior, where the * files are stored across the configured temp directories. @@ -808,13 +756,6 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu } /** - * Gets whether bounding memory is enabled for this state backend. - */ - public boolean isBoundedMemoryEnabled() { - return totalMemoryPerSlot > 0; - } - - /** * Enable compaction filter to cleanup state with TTL is enabled. * * <p>Note: User can still decide in state TTL configuration in state descriptor @@ -963,34 +904,6 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu setNumberOfTransferThreads(numberOfTransferingThreads); } - public void setTotalMemoryPerSlot(String totalMemoryPerSlotStr) { - long totalMemoryPerSlot = MemorySize.parseBytes(totalMemoryPerSlotStr); - Preconditions.checkArgument(totalMemoryPerSlot > 0, String.format("Illegal total memory per slot %s for RocksDBs.", totalMemoryPerSlot)); - this.totalMemoryPerSlot = totalMemoryPerSlot; - } - - public long getTotalMemoryPerSlot() { - return totalMemoryPerSlot; - } - - public void setWriteBufferRatio(double writeBufferRatio) { - Preconditions.checkArgument(writeBufferRatio > 0 && writeBufferRatio < 1.0, String.format("Illegal write buffer ratio %s for RocksDBs.", writeBufferRatio)); - this.writeBufferRatio = writeBufferRatio; - } - - public double getWriteBufferRatio() { - return writeBufferRatio; - } - - public void setHighPriPoolRatio(double highPriPoolRatio) { - Preconditions.checkArgument(highPriPoolRatio > 0 && highPriPoolRatio < 1.0, String.format("Illegal high priority pool ratio %s for RocksDBs.", highPriPoolRatio)); - this.highPriPoolRatio = highPriPoolRatio; - } - - public double getHighPriPoolRatio() { - return highPriPoolRatio; - } - // ------------------------------------------------------------------------ // utilities // ------------------------------------------------------------------------ diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendBoundedMemoryTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendBoundedMemoryTest.java deleted file mode 100644 index c564eb4..0000000 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendBoundedMemoryTest.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state; - -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.CloseableRegistry; -import org.apache.flink.core.memory.MemoryType; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; -import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.memory.MemoryManagerBuilder; -import org.apache.flink.runtime.operators.testutils.DummyEnvironment; -import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.ttl.TtlTimeProvider; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.rocksdb.Cache; -import org.rocksdb.WriteBufferManager; - -import java.io.IOException; -import java.util.Collections; - -import static org.junit.Assert.fail; - -/** - * Tests to verify memory bounded for rocksDB state backend. - */ -public class RocksDBStateBackendBoundedMemoryTest { - - private static final int MEMORY_SIZE = 64 * 1024 * 1024; // 64 MiBytes - - private MemoryManager memoryManager; - - @Rule - public final TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void setUp() { - this.memoryManager = MemoryManagerBuilder - .newBuilder() - .setMemorySize(MemoryType.OFF_HEAP, MEMORY_SIZE) - .build(); - } - - @After - public void shutDown() { - this.memoryManager.shutdown(); - } - - @Test - public void testSharedObjectsInitializeOnlyOnce() throws IOException { - DummyEnvironment env = new DummyEnvironment(); - env.setMemoryManager(memoryManager); - - Assert.assertNull(memoryManager.getStateBackendSharedObject()); - - Configuration configuration = new Configuration(); - configuration.setString(RocksDBOptions.BOUNDED_MEMORY_SIZE, "128MB"); - RocksDBStateBackend originalStateBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI()); - originalStateBackend.setDbStoragePath(tempFolder.newFolder().getAbsolutePath()); - - RocksDBStateBackend rocksDBStateBackend1 = originalStateBackend.configure(configuration, Thread.currentThread().getContextClassLoader()); - AbstractKeyedStateBackend keyedStateBackend1 = createKeyedStateBackend(rocksDBStateBackend1, env); - - Assert.assertTrue("The shared object should be a RocksDBSharedObject instance but actually not", - memoryManager.getStateBackendSharedObject() instanceof RocksDBSharedObjects); - RocksDBSharedObjects sharedObjects = (RocksDBSharedObjects) memoryManager.getStateBackendSharedObject(); - - Cache lruCache = sharedObjects.getCache(); - WriteBufferManager writeBufferManager = sharedObjects.getWriteBufferManager(); - - RocksDBStateBackend rocksDBStateBackend2 = originalStateBackend.configure(configuration, Thread.currentThread().getContextClassLoader()); - AbstractKeyedStateBackend keyedStateBackend2 = createKeyedStateBackend(rocksDBStateBackend2, env); - - // Another keyed state backend is created but only initialized once for cache and write buffer manager. - sharedObjects = (RocksDBSharedObjects) memoryManager.getStateBackendSharedObject(); - - try { - Assert.assertEquals(lruCache, sharedObjects.getCache()); - Assert.assertEquals(writeBufferManager, sharedObjects.getWriteBufferManager()); - } finally { - keyedStateBackend1.close(); - keyedStateBackend2.close(); - } - } - - @Test - public void testSharedObjectsNotClosedAfterKeyedStateBackendClosed() throws IOException { - DummyEnvironment env = new DummyEnvironment(); - env.setMemoryManager(memoryManager); - Assert.assertNull(memoryManager.getStateBackendSharedObject()); - - Configuration configuration = new Configuration(); - configuration.setString(RocksDBOptions.BOUNDED_MEMORY_SIZE, "128MB"); - RocksDBStateBackend originalStateBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI()); - originalStateBackend.setDbStoragePath(tempFolder.newFolder().getAbsolutePath()); - RocksDBStateBackend rocksDBStateBackend = originalStateBackend.configure(configuration, Thread.currentThread().getContextClassLoader()); - AbstractKeyedStateBackend keyedStateBackend = createKeyedStateBackend(rocksDBStateBackend, env); - - Assert.assertTrue("The shared object should be a RocksDBSharedObject instance but actually not", - memoryManager.getStateBackendSharedObject() instanceof RocksDBSharedObjects); - - RocksDBSharedObjects stateBackendSharedObject = (RocksDBSharedObjects) memoryManager.getStateBackendSharedObject(); - Assert.assertTrue(stateBackendSharedObject.getCache().isOwningHandle()); - Assert.assertTrue(stateBackendSharedObject.getWriteBufferManager().isOwningHandle()); - - keyedStateBackend.close(); - // even keyed state backend closed, cache and write buffer manager would not be disposed. - Assert.assertTrue(stateBackendSharedObject.getCache().isOwningHandle()); - Assert.assertTrue(stateBackendSharedObject.getWriteBufferManager().isOwningHandle()); - } - - @Test - public void testSharedObjectsInitializedConcurrently() throws IOException { - DummyEnvironment env = new DummyEnvironment(); - env.setMemoryManager(memoryManager); - - Assert.assertNull(memoryManager.getStateBackendSharedObject()); - - Configuration configuration = new Configuration(); - configuration.setString(RocksDBOptions.BOUNDED_MEMORY_SIZE, "128MB"); - RocksDBStateBackend originalStateBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI()); - originalStateBackend.setDbStoragePath(tempFolder.newFolder().getAbsolutePath()); - - int numThreads = 4; - Thread[] threads = new Thread[numThreads]; - RocksDBKeyedStateBackend[] keyedStateBackends = new RocksDBKeyedStateBackend[numThreads]; - for (int i = 0; i < numThreads; i++) { - int index = i; - threads[index] = new Thread(() -> { - RocksDBStateBackend rocksDBStateBackend = originalStateBackend.configure(configuration, Thread.currentThread().getContextClassLoader()); - try { - keyedStateBackends[index] = (RocksDBKeyedStateBackend) createKeyedStateBackend(rocksDBStateBackend, env); - } catch (IOException ignored) { - } - }); - threads[i].setDaemon(true); - } - - // launch concurrently - for (int i = 0; i < numThreads; i++) { - threads[i].start(); - } - - try { - // sync - for (int i = 0; i < numThreads; i++) { - threads[i].join(5000); - } - } catch (InterruptedException e) { - fail(e.getMessage()); - } - - WriteBufferManager writeBufferManager = null; - for (RocksDBKeyedStateBackend keyedStateBackend : keyedStateBackends) { - if (writeBufferManager == null) { - writeBufferManager = keyedStateBackend.getDbOptions().writeBufferManager(); - } else { - // different keyed state backends but share the same write buffer manager - Assert.assertEquals(writeBufferManager, keyedStateBackend.getDbOptions().writeBufferManager()); - } - keyedStateBackend.close(); - } - } - - private AbstractKeyedStateBackend createKeyedStateBackend(RocksDBStateBackend rocksDBStateBackend, Environment env) throws IOException { - return rocksDBStateBackend.createKeyedStateBackend( - env, - env.getJobID(), - "test_op", - IntSerializer.INSTANCE, - 1, - new KeyGroupRange(0, 0), - env.getTaskKvStateRegistry(), - TtlTimeProvider.DEFAULT, - new UnregisteredMetricsGroup(), - Collections.emptyList(), - new CloseableRegistry()); - - } -} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index 9cd338f..ad52f18 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -64,6 +64,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -637,36 +638,35 @@ public class RocksDBStateBackendConfigTest { @Test public void testDefaultMemoryControlParameters() { - StateBackend storageBackend = new MemoryStateBackend(); - RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(storageBackend); - assertEquals(RocksDBStateBackend.UNDEFINED_VALUE, rocksDbBackend.getTotalMemoryPerSlot()); - assertEquals(RocksDBStateBackend.UNDEFINED_VALUE, rocksDbBackend.getHighPriPoolRatio(), 0.0); - assertEquals(RocksDBStateBackend.UNDEFINED_VALUE, rocksDbBackend.getWriteBufferRatio(), 0.0); - - RocksDBStateBackend configure = rocksDbBackend.configure(new Configuration(), Thread.currentThread().getContextClassLoader()); - assertEquals(RocksDBStateBackend.UNDEFINED_VALUE, configure.getTotalMemoryPerSlot()); - assertEquals(RocksDBOptions.HIGH_PRI_POOL_RATIO.defaultValue(), configure.getHighPriPoolRatio(), 0.0); - assertEquals(RocksDBOptions.WRITE_BUFFER_RATIO.defaultValue(), configure.getWriteBufferRatio(), 0.0); + RocksDBMemoryConfiguration memSettings = new RocksDBMemoryConfiguration(); + assertFalse(memSettings.isUsingFixedMemoryPerSlot()); + assertEquals(RocksDBOptions.HIGH_PRIORITY_POOL_RATIO.defaultValue(), memSettings.getHighPriorityPoolRatio(), 0.0); + assertEquals(RocksDBOptions.WRITE_BUFFER_RATIO.defaultValue(), memSettings.getWriteBufferRatio(), 0.0); + + RocksDBMemoryConfiguration configured = RocksDBMemoryConfiguration.fromOtherAndConfiguration(memSettings, new Configuration()); + assertFalse(configured.isUsingFixedMemoryPerSlot()); + assertEquals(RocksDBOptions.HIGH_PRIORITY_POOL_RATIO.defaultValue(), configured.getHighPriorityPoolRatio(), 0.0); + assertEquals(RocksDBOptions.WRITE_BUFFER_RATIO.defaultValue(), configured.getWriteBufferRatio(), 0.0); } @Test public void testConfigureIllegalMemoryControlParameters() { - StateBackend storageBackend = new MemoryStateBackend(); - RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(storageBackend); + RocksDBMemoryConfiguration memSettings = new RocksDBMemoryConfiguration(); + + verifySetParameter(() -> memSettings.setFixedMemoryPerSlot("-1B")); + verifySetParameter(() -> memSettings.setHighPriorityPoolRatio(-0.1)); + verifySetParameter(() -> memSettings.setHighPriorityPoolRatio(1.1)); + verifySetParameter(() -> memSettings.setWriteBufferRatio(-0.1)); + verifySetParameter(() -> memSettings.setWriteBufferRatio(1.1)); - verifySetParameter(() -> rocksDbBackend.setTotalMemoryPerSlot("-1B")); - verifySetParameter(() -> rocksDbBackend.setHighPriPoolRatio(-0.1)); - verifySetParameter(() -> rocksDbBackend.setHighPriPoolRatio(1.1)); - verifySetParameter(() -> rocksDbBackend.setWriteBufferRatio(-0.1)); - verifySetParameter(() -> rocksDbBackend.setWriteBufferRatio(1.1)); + memSettings.setFixedMemoryPerSlot("128MB"); + memSettings.setWriteBufferRatio(0.6); + memSettings.setHighPriorityPoolRatio(0.6); - rocksDbBackend.setTotalMemoryPerSlot("128MB"); - rocksDbBackend.setWriteBufferRatio(0.6); - rocksDbBackend.setHighPriPoolRatio(0.6); try { // sum of writeBufferRatio and highPriPoolRatio larger than 1.0 - rocksDbBackend.configure(new Configuration(), Thread.currentThread().getContextClassLoader()); - fail("No expected IllegalArgumentException."); + memSettings.validate(); + fail("Expected an IllegalArgumentException."); } catch (IllegalArgumentException expected) { // expected exception }