This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 06fb0ec1f65056fd0ef5b67013ca9ae288486961
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Dec 5 19:44:22 2019 +0100

    [FLINK-14484][state-backend] Some refactorings for the RocksDB memory 
controlling code
    
    This closes #10449
---
 .../generated/rocks_db_configuration.html          |  24 +++
 .../state/api/runtime/SavepointEnvironment.java    |   5 +-
 .../streaming/state/RocksDBKeyedStateBackend.java  |   5 -
 .../state/RocksDBMemoryConfiguration.java          | 198 ++++++++++++++++++++
 .../streaming/state/RocksDBOperationUtils.java     |  49 +++++
 .../contrib/streaming/state/RocksDBOptions.java    |  43 +++--
 ...redObjects.java => RocksDBSharedResources.java} |  22 ++-
 .../streaming/state/RocksDBStateBackend.java       | 135 +++-----------
 .../RocksDBStateBackendBoundedMemoryTest.java      | 204 ---------------------
 .../state/RocksDBStateBackendConfigTest.java       |  44 ++---
 10 files changed, 364 insertions(+), 365 deletions(-)

diff --git a/docs/_includes/generated/rocks_db_configuration.html 
b/docs/_includes/generated/rocks_db_configuration.html
index a367b78..660bf8d 100644
--- a/docs/_includes/generated/rocks_db_configuration.html
+++ b/docs/_includes/generated/rocks_db_configuration.html
@@ -21,6 +21,30 @@
             <td>The local directory (on the TaskManager) where RocksDB puts 
its files.</td>
         </tr>
         <tr>
+            <td><h5>state.backend.rocksdb.memory.fixed-per-slot</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>MemorySize</td>
+            <td>The fixed total amount of memory, shared among all RocksDB 
instances per slot. This option overrides the 
'state.backend.rocksdb.memory.managed' option when configured. If neither this 
option, nor the 'state.backend.rocksdb.memory.managed' optionare set, then each 
RocksDB column family state has its own memory caches (as controlled by the 
column family options).</td>
+        </tr>
+        <tr>
+            <td><h5>state.backend.rocksdb.memory.high-prio-pool-ratio</h5></td>
+            <td style="word-wrap: break-word;">0.1</td>
+            <td>Double</td>
+            <td>The fraction of cache memory that is reserved for 
high-priority data like index, filter, and compression dictionary blocks. This 
option only has an effect when 'state.backend.rocksdb.memory.managed' or 
'state.backend.rocksdb.memory.fixed-per-slot' are configured.</td>
+        </tr>
+        <tr>
+            <td><h5>state.backend.rocksdb.memory.managed</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If set, the RocksDB state backend will automatically configure 
itself to use the managed memory budget of the task slot, and divide the memory 
over write buffers, indexes, block caches, etc. That way, the state backend 
will not exceed the available memory, but use as much memory as it can.</td>
+        </tr>
+        <tr>
+            <td><h5>state.backend.rocksdb.memory.write-buffer-ratio</h5></td>
+            <td style="word-wrap: break-word;">0.5</td>
+            <td>Double</td>
+            <td>The maximum amount of memory that write buffers may take, as a 
fraction of the total cache memory. This option only has an effect when 
'state.backend.rocksdb.memory.managed' or 
'state.backend.rocksdb.memory.fixed-per-slot' are configured.</td>
+        </tr>
+        <tr>
             <td><h5>state.backend.rocksdb.options-factory</h5></td>
             <td style="word-wrap: 
break-word;">"org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory"</td>
             <td>String</td>
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
index 625e95a..a7a1db7 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
@@ -81,6 +81,8 @@ public class SavepointEnvironment implements Environment {
 
        private final IOManager ioManager;
 
+       private final MemoryManager memoryManager;
+
        private final AccumulatorRegistry accumulatorRegistry;
 
        private SavepointEnvironment(RuntimeContext ctx, Configuration 
configuration, int maxParallelism, int indexOfSubtask, 
PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskState) {
@@ -97,6 +99,7 @@ public class SavepointEnvironment implements Environment {
                this.registry = new KvStateRegistry().createTaskRegistry(jobID, 
vertexID);
                this.taskStateManager = new 
SavepointTaskStateManager(prioritizedOperatorSubtaskState);
                this.ioManager = new IOManagerAsync();
+               this.memoryManager = MemoryManager.forDefaultPageSize(64 * 1024 
* 1024);
                this.accumulatorRegistry = new AccumulatorRegistry(jobID, 
attemptID);
        }
 
@@ -162,7 +165,7 @@ public class SavepointEnvironment implements Environment {
 
        @Override
        public MemoryManager getMemoryManager() {
-               throw new UnsupportedOperationException(ERROR_MSG);
+               return memoryManager;
        }
 
        @Override
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 07b1fd8..16c8757 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -403,11 +403,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        }
 
        @VisibleForTesting
-       DBOptions getDbOptions() {
-               return dbOptions;
-       }
-
-       @VisibleForTesting
        boolean isDisposed() {
                return this.disposed;
        }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryConfiguration.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryConfiguration.java
new file mode 100644
index 0000000..f3f9442
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryConfiguration.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The settings regarding RocksDBs memory usage.
+ */
+public final class RocksDBMemoryConfiguration implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       /** Flag whether to use the managed memory budget for RocksDB. Null is 
not set. */
+       @Nullable
+       private Boolean useManagedMemory;
+
+       /**The total memory for all RocksDB instances at this slot. Null is not 
set. */
+       @Nullable
+       private MemorySize fixedMemoryPerSlot;
+
+       /** The maximum fraction of the shared cache consumed by the write 
buffers. Null if not set.*/
+       @Nullable
+       private Double writeBufferRatio;
+
+       /** The high priority pool ratio in the shared cache, used for index & 
filter blocks. Null if not set.*/
+       @Nullable
+       private Double highPriorityPoolRatio;
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Configures RocksDB to use the managed memory of a slot.
+        * See {@link RocksDBOptions#USE_MANAGED_MEMORY} for details.
+        */
+       public void setUseManagedMemory(boolean useManagedMemory) {
+               this.useManagedMemory = useManagedMemory;
+       }
+
+       /**
+        * Configures RocksDB to use a fixed amount of memory shared between 
all instances (operators) in a slot.
+        * See {@link RocksDBOptions#FIX_PER_SLOT_MEMORY_SIZE} for details.
+        */
+       public void setFixedMemoryPerSlot(MemorySize fixedMemoryPerSlot) {
+               checkArgument(fixedMemoryPerSlot == null || 
fixedMemoryPerSlot.getBytes() > 0,
+                       "Total memory per slot must be > 0");
+
+               this.fixedMemoryPerSlot = fixedMemoryPerSlot;
+       }
+
+       /**
+        * Configures RocksDB to use a fixed amount of memory shared between 
all instances (operators) in a slot.
+        * See {@link #setFixedMemoryPerSlot(MemorySize)} for details.
+        */
+       public void setFixedMemoryPerSlot(String totalMemoryPerSlotStr) {
+               setFixedMemoryPerSlot(MemorySize.parse(totalMemoryPerSlotStr));
+       }
+
+       /**
+        * Sets the fraction of the total memory to be used for write buffers.
+        * This only has an effect is either {@link 
#setUseManagedMemory(boolean)} or
+        * {@link #setFixedMemoryPerSlot(MemorySize)} are set.
+        *
+        * <p>See {@link RocksDBOptions#WRITE_BUFFER_RATIO} for details.
+        */
+       public void setWriteBufferRatio(double writeBufferRatio) {
+               Preconditions.checkArgument(writeBufferRatio > 0 && 
writeBufferRatio < 1.0,
+                       "Write Buffer ratio %s must be in (0, 1)", 
writeBufferRatio);
+               this.writeBufferRatio = writeBufferRatio;
+       }
+
+       /**
+        * Sets the fraction of the total memory to be used for high priority 
blocks like indexes, dictionaries, etc.
+        * This only has an effect is either {@link 
#setUseManagedMemory(boolean)} or
+        * {@link #setFixedMemoryPerSlot(MemorySize)} are set.
+        *
+        * <p>See {@link RocksDBOptions#HIGH_PRIORITY_POOL_RATIO} for details.
+        */
+       public void setHighPriorityPoolRatio(double highPriorityPoolRatio) {
+               Preconditions.checkArgument(highPriorityPoolRatio > 0 && 
highPriorityPoolRatio < 1.0,
+                       "High priority pool ratio %s must be in (0, 1)", 
highPriorityPoolRatio);
+               this.highPriorityPoolRatio = highPriorityPoolRatio;
+       }
+
+       /**
+        * Gets whether the state backend is configured to use the managed 
memory of a slot for RocksDB.
+        * See {@link RocksDBOptions#USE_MANAGED_MEMORY} for details.
+        */
+       public boolean isUsingManagedMemory() {
+               return useManagedMemory != null ? useManagedMemory : 
RocksDBOptions.USE_MANAGED_MEMORY.defaultValue();
+       }
+
+       /**
+        * Gets whether the state backend is configured to use a fixed amount 
of memory shared between all
+        * RocksDB instances (in all tasks and operators) of a slot.
+        * See {@link RocksDBOptions#USE_MANAGED_MEMORY} for details.
+        */
+       public boolean isUsingFixedMemoryPerSlot() {
+               return fixedMemoryPerSlot != null;
+       }
+
+       /**
+        * Gets the fixed amount of memory to be shared between all RocksDB 
instances (in all tasks and
+        * operators) of a slot. Null is not configured.
+        * See {@link RocksDBOptions#USE_MANAGED_MEMORY} for details.
+        */
+       @Nullable
+       public MemorySize getFixedMemoryPerSlot() {
+               return fixedMemoryPerSlot;
+       }
+
+       /**
+        * Gets the fraction of the total memory to be used for write buffers.
+        * This only has an effect is either {@link 
#setUseManagedMemory(boolean)} or
+        * {@link #setFixedMemoryPerSlot(MemorySize)} are set.
+        *
+        * <p>See {@link RocksDBOptions#WRITE_BUFFER_RATIO} for details.
+        */
+       public double getWriteBufferRatio() {
+               return writeBufferRatio != null ? writeBufferRatio : 
RocksDBOptions.WRITE_BUFFER_RATIO.defaultValue();
+       }
+
+       /**
+        * Gets the fraction of the total memory to be used for high priority 
blocks like indexes, dictionaries, etc.
+        * This only has an effect is either {@link 
#setUseManagedMemory(boolean)} or
+        * {@link #setFixedMemoryPerSlot(MemorySize)} are set.
+        *
+        * <p>See {@link RocksDBOptions#HIGH_PRIORITY_POOL_RATIO} for details.
+        */
+       public double getHighPriorityPoolRatio() {
+               return highPriorityPoolRatio != null ? highPriorityPoolRatio : 
RocksDBOptions.HIGH_PRIORITY_POOL_RATIO.defaultValue();
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Validates if the configured options are valid with respect to one 
another.
+        */
+       public void validate() {
+               if (writeBufferRatio != null && highPriorityPoolRatio != null 
&& writeBufferRatio + highPriorityPoolRatio > 1.0) {
+                       throw new IllegalArgumentException(String.format(
+                               "Invalid configuration: Sum of writeBufferRatio 
%s and highPriPoolRatio %s should be less than 1.0",
+                               writeBufferRatio, highPriorityPoolRatio));
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Derives a RocksDBMemoryConfiguration from another object and a 
configuration.
+        * The values set on the other object take precedence, and the values 
from the configuration are
+        * used if no values are set on the other config object.
+        */
+       public static RocksDBMemoryConfiguration fromOtherAndConfiguration(
+                       RocksDBMemoryConfiguration other,
+                       Configuration config) {
+
+               final RocksDBMemoryConfiguration newConfig = new 
RocksDBMemoryConfiguration();
+
+               newConfig.fixedMemoryPerSlot = other.fixedMemoryPerSlot != null
+                               ? other.fixedMemoryPerSlot
+                               : 
config.get(RocksDBOptions.FIX_PER_SLOT_MEMORY_SIZE);
+
+               newConfig.writeBufferRatio = other.writeBufferRatio != null
+                               ? other.writeBufferRatio
+                               : 
config.getDouble(RocksDBOptions.WRITE_BUFFER_RATIO);
+
+               newConfig.highPriorityPoolRatio = other.highPriorityPoolRatio 
!= null
+                       ? other.highPriorityPoolRatio
+                       : 
config.getDouble(RocksDBOptions.HIGH_PRIORITY_POOL_RATIO);
+
+               return newConfig;
+       }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
index 1e3fb8f..1f570ff 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
@@ -19,17 +19,24 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.configuration.ConfigConstants;
 import 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.memory.OpaqueMemoryResource;
 import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.LongFunctionWithException;
 
+import org.rocksdb.Cache;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.LRUCache;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBufferManager;
+import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
 
@@ -46,6 +53,11 @@ import static 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.
  * Utils for RocksDB Operations.
  */
 public class RocksDBOperationUtils {
+
+       private static final String MANAGED_MEMORY_RESOURCE_ID = 
"state-rocks-managed-memory";
+
+       private static final String FIXED_SLOT_MEMORY_RESOURCE_ID = 
"state-rocks-fixed-slot-memory";
+
        public static RocksDB openDB(
                String path,
                List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors,
@@ -164,4 +176,41 @@ public class RocksDBOperationUtils {
                        // ignore
                }
        }
+
+       @Nullable
+       public static OpaqueMemoryResource<RocksDBSharedResources> 
allocateSharedCachesIfConfigured(
+                       RocksDBMemoryConfiguration memoryConfig,
+                       MemoryManager memoryManager,
+                       Logger logger) throws IOException {
+
+               if (!memoryConfig.isUsingFixedMemoryPerSlot() && 
!memoryConfig.isUsingManagedMemory()) {
+                       return null;
+               }
+
+               final double highPriorityPoolRatio = 
memoryConfig.getHighPriorityPoolRatio();
+               final double writeBufferRatio = 
memoryConfig.getWriteBufferRatio();
+
+               final LongFunctionWithException<RocksDBSharedResources, 
Exception> allocator = (size) -> {
+                       final Cache cache = new LRUCache(size, -1, false, 
highPriorityPoolRatio);
+                       final WriteBufferManager wbm = new 
WriteBufferManager((long) (writeBufferRatio * size), cache);
+                       return new RocksDBSharedResources(cache, wbm);
+               };
+
+               try {
+                       if (memoryConfig.isUsingFixedMemoryPerSlot()) {
+                               assert memoryConfig.getFixedMemoryPerSlot() != 
null;
+
+                               logger.info("Getting fixed-size shared cache 
for RocksDB.");
+                               return 
memoryManager.getExternalSharedMemoryResource(
+                                               FIXED_SLOT_MEMORY_RESOURCE_ID, 
allocator, memoryConfig.getFixedMemoryPerSlot().getBytes());
+                       }
+                       else {
+                               logger.info("Getting managed memory shared 
cache for RocksDB.");
+                               return 
memoryManager.getSharedMemoryResourceForManagedMemory(MANAGED_MEMORY_RESOURCE_ID,
 allocator);
+                       }
+               }
+               catch (Exception e) {
+                       throw new IOException("Failed to acquire shared cache 
resource for RocksDB", e);
+               }
+       }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
index 928882f..5f438bc 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
@@ -20,6 +20,7 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
 
 import static 
org.apache.flink.contrib.streaming.state.PredefinedOptions.DEFAULT;
 import static 
org.apache.flink.contrib.streaming.state.PredefinedOptions.FLASH_SSD_OPTIMIZED;
@@ -86,24 +87,42 @@ public class RocksDBOptions {
                                "The default options factory is %s, and it 
would read the configured options which provided in 
'RocksDBConfigurableOptions'.",
                                
DefaultConfigurableOptionsFactory.class.getName()));
 
-       public static final ConfigOption<String> BOUNDED_MEMORY_SIZE = 
ConfigOptions
-               .key("state.backend.rocksdb.per-slot.total.memory")
-               .stringType()
+       public static final ConfigOption<Boolean> USE_MANAGED_MEMORY = 
ConfigOptions
+               .key("state.backend.rocksdb.memory.managed")
+               .booleanType()
+               .defaultValue(false)
+               .withDescription("If set, the RocksDB state backend will 
automatically configure itself to use the " +
+                       "managed memory budget of the task slot, and divide the 
memory over write buffers, indexes, " +
+                       "block caches, etc. That way, the state backend will 
not exceed the available memory, but use as much " +
+                       "memory as it can.");
+
+       public static final ConfigOption<MemorySize> FIX_PER_SLOT_MEMORY_SIZE = 
ConfigOptions
+               .key("state.backend.rocksdb.memory.fixed-per-slot")
+               .memoryType()
                .noDefaultValue()
-               .withDescription("The total memory size shared among all 
RocksDB instances per slot. " +
-                       "This option has no default value which means no 
bounded memory limit.");
+               .withDescription(String.format(
+                       "The fixed total amount of memory, shared among all 
RocksDB instances per slot. " +
+                       "This option overrides the '%s' option when configured. 
If neither this option, nor the '%s' option" +
+                       "are set, then each RocksDB column family state has its 
own memory caches (as controlled by the column " +
+                       "family options).", USE_MANAGED_MEMORY.key(), 
USE_MANAGED_MEMORY.key()));
 
        public static final ConfigOption<Double> WRITE_BUFFER_RATIO = 
ConfigOptions
-               .key("state.backend.rocksdb.write-buffer.ratio")
+               .key("state.backend.rocksdb.memory.write-buffer-ratio")
                .doubleType()
                .defaultValue(0.5)
-               .withDescription(String.format("This option would only take 
effect when %s is configured, " +
-                       "all RocksDB instances would share the same write 
buffer manager with the ratio of a LRUCache.", BOUNDED_MEMORY_SIZE));
+               .withDescription(String.format(
+                       "The maximum amount of memory that write buffers may 
take, as a fraction of the total cache memory. " +
+                       "This option only has an effect when '%s' or '%s' are 
configured.",
+                       USE_MANAGED_MEMORY.key(),
+                       FIX_PER_SLOT_MEMORY_SIZE.key()));
 
-       public static final ConfigOption<Double> HIGH_PRI_POOL_RATIO = 
ConfigOptions
-               .key("state.backend.rocksdb.high-pri-pool.ratio")
+       public static final ConfigOption<Double> HIGH_PRIORITY_POOL_RATIO = 
ConfigOptions
+               .key("state.backend.rocksdb.memory.high-prio-pool-ratio")
                .doubleType()
                .defaultValue(0.1)
-               .withDescription(String.format("This option would only take 
effect when %s is configured, " +
-                       "all RocksDB instances would share the high priority 
ratio for index, filter, and compression dictionary blocks in a LRUCache.", 
BOUNDED_MEMORY_SIZE));
+               .withDescription(String.format(
+                               "The fraction of cache memory that is reserved 
for high-priority data like index, filter, and " +
+                               "compression dictionary blocks. This option 
only has an effect when '%s' or '%s' are configured.",
+                               USE_MANAGED_MEMORY.key(),
+                               FIX_PER_SLOT_MEMORY_SIZE.key()));
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedObjects.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResources.java
similarity index 78%
rename from 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedObjects.java
rename to 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResources.java
index 188c506..dbd46fa 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedObjects.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResources.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,24 +22,20 @@ import org.rocksdb.Cache;
 import org.rocksdb.WriteBufferManager;
 
 /**
- * Shared objects among RocksDB instances per slot.
+ * The set of resources that can be shared by all RocksDB instances in a slot.
+ * Sharing these resources helps RocksDB a predictable resource footprint.
  */
-public class RocksDBSharedObjects implements AutoCloseable {
+final class RocksDBSharedResources implements AutoCloseable {
 
        private final Cache cache;
+
        private final WriteBufferManager writeBufferManager;
 
-       RocksDBSharedObjects(Cache cache, WriteBufferManager 
writeBufferManager) {
+       RocksDBSharedResources(Cache cache, WriteBufferManager 
writeBufferManager) {
                this.cache = cache;
                this.writeBufferManager = writeBufferManager;
        }
 
-       @Override
-       public void close() {
-               writeBufferManager.close();
-               cache.close();
-       }
-
        public Cache getCache() {
                return cache;
        }
@@ -47,4 +43,10 @@ public class RocksDBSharedObjects implements AutoCloseable {
        public WriteBufferManager getWriteBufferManager() {
                return writeBufferManager;
        }
+
+       @Override
+       public void close() {
+               writeBufferManager.close();
+               cache.close();
+       }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 90e8583..2dcf5c2 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -24,12 +24,11 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.memory.OpaqueMemoryResource;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -56,11 +55,9 @@ import org.rocksdb.BlockBasedTableConfig;
 import org.rocksdb.Cache;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
-import org.rocksdb.LRUCache;
 import org.rocksdb.NativeLibraryLoader;
 import org.rocksdb.RocksDB;
 import org.rocksdb.TableFormatConfig;
-import org.rocksdb.WriteBufferManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,14 +75,10 @@ import java.util.List;
 import java.util.Random;
 import java.util.UUID;
 import java.util.function.Function;
-import java.util.function.Supplier;
 
-import static 
org.apache.flink.contrib.streaming.state.RocksDBOptions.BOUNDED_MEMORY_SIZE;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM;
-import static 
org.apache.flink.contrib.streaming.state.RocksDBOptions.HIGH_PRI_POOL_RATIO;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBOptions.TIMER_SERVICE_FACTORY;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBOptions.TTL_COMPACT_FILTER_ENABLED;
-import static 
org.apache.flink.contrib.streaming.state.RocksDBOptions.WRITE_BUFFER_RATIO;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -123,8 +116,6 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
 
        private static final int UNDEFINED_NUMBER_OF_TRANSFER_THREADS = -1;
 
-       static final long UNDEFINED_VALUE = -1;
-
        // 
------------------------------------------------------------------------
 
        // -- configuration values, set in the application / configuration
@@ -160,17 +151,8 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
         */
        private TernaryBoolean enableTtlCompactionFilter;
 
-       /**
-        * Total memory for all rocksDB instances at this slot.
-        * Currently, we would use a LRU cache to serve as the total memory 
usage.
-        */
-       private long totalMemoryPerSlot;
-
-       /** The write buffer ratio which consumed by write-buffer manager from 
the shared cache. */
-       private double writeBufferRatio;
-
-       /** The high priority pool ratio which consumed by index&filter from 
the shared cache. */
-       private double highPriPoolRatio;
+       /** The configuration for memory settings (pool sizes, etc.). */
+       private final RocksDBMemoryConfiguration memoryConfiguration;
 
        /** This determines the type of priority queue state. */
        private final PriorityQueueStateType priorityQueueStateType;
@@ -293,9 +275,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                this.priorityQueueStateType = PriorityQueueStateType.HEAP;
                this.defaultMetricOptions = new RocksDBNativeMetricOptions();
                this.enableTtlCompactionFilter = TernaryBoolean.UNDEFINED;
-               this.totalMemoryPerSlot = UNDEFINED_VALUE;
-               this.writeBufferRatio = UNDEFINED_VALUE;
-               this.highPriPoolRatio = UNDEFINED_VALUE;
+               this.memoryConfiguration = new RocksDBMemoryConfiguration();
        }
 
        /**
@@ -341,32 +321,8 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                this.enableTtlCompactionFilter = 
original.enableTtlCompactionFilter
                        
.resolveUndefined(config.getBoolean(TTL_COMPACT_FILTER_ENABLED));
 
-               configureBoundedMemory(
-                       () -> (double) original.getTotalMemoryPerSlot(),
-                       () -> {
-                               if (config.getString(BOUNDED_MEMORY_SIZE) != 
null) {
-                                       
setTotalMemoryPerSlot(config.getString(BOUNDED_MEMORY_SIZE));
-                               } else {
-                                       // we still left total memory size per 
slot as undefined if no actual settings.
-                                       this.totalMemoryPerSlot = 
UNDEFINED_VALUE;
-                               }
-                       },
-                       () -> 
setTotalMemoryPerSlot(String.valueOf(original.totalMemoryPerSlot)));
-
-               configureBoundedMemory(
-                       original::getWriteBufferRatio,
-                       () -> 
setWriteBufferRatio(config.getDouble(WRITE_BUFFER_RATIO)),
-                       () -> setWriteBufferRatio(original.writeBufferRatio));
-
-               configureBoundedMemory(
-                       original::getHighPriPoolRatio,
-                       () -> 
setHighPriPoolRatio(config.getDouble(HIGH_PRI_POOL_RATIO)),
-                       () -> setHighPriPoolRatio(original.highPriPoolRatio));
-
-               if (isBoundedMemoryEnabled()) {
-                       Preconditions.checkArgument(this.writeBufferRatio + 
this.highPriPoolRatio < 1.0,
-                               String.format("Illegal sum of writeBufferRatio 
%s and highPriPoolRatio %s, should be less than 1.0", this.writeBufferRatio, 
this.highPriPoolRatio));
-               }
+               this.memoryConfiguration = 
RocksDBMemoryConfiguration.fromOtherAndConfiguration(original.memoryConfiguration,
 config);
+               this.memoryConfiguration.validate();
 
                final String priorityQueueTypeString = 
config.getString(TIMER_SERVICE_FACTORY);
 
@@ -412,14 +368,6 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                }
        }
 
-       private void configureBoundedMemory(Supplier<Double> getOriginalValue, 
Runnable parseFromConfig, Runnable setFromOriginal) {
-               if (getOriginalValue.get() == UNDEFINED_VALUE) {
-                       parseFromConfig.run();
-               } else {
-                       setFromOriginal.run();
-               }
-       }
-
        // 
------------------------------------------------------------------------
        //  Reconfiguration
        // 
------------------------------------------------------------------------
@@ -558,24 +506,16 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                DBOptions dbOptions = getDbOptions();
                Function<String, ColumnFamilyOptions> createColumnOptions;
 
-               if (isBoundedMemoryEnabled()) {
-                       RocksDBSharedObjects rocksDBSharedObjects;
-                       MemoryManager memoryManager = env.getMemoryManager();
-                       // only initialized LRUCache and write buffer manager 
once.
-                       // we would not dispose the cache and write buffer 
manager during disposing state backend
-                       // as the same objects are shared by every RocksDB 
instance per slot.
-                       while ((rocksDBSharedObjects = (RocksDBSharedObjects) 
memoryManager.getStateBackendSharedObject()) == null) {
-                               if 
(memoryManager.getLazyInitializeSharedObjectHelper().compareAndSet(false, 
true)) {
-                                       Cache lruCache = new 
LRUCache(getTotalMemoryPerSlot(), -1, false, getHighPriPoolRatio());
-                                       WriteBufferManager writeBufferManager = 
new WriteBufferManager((long) (getTotalMemoryPerSlot() * 
getWriteBufferRatio()), lruCache);
-
-                                       rocksDBSharedObjects = new 
RocksDBSharedObjects(lruCache, writeBufferManager);
-                                       
memoryManager.setStateBackendSharedObject(rocksDBSharedObjects);
-                               }
-                       }
+               final OpaqueMemoryResource<RocksDBSharedResources> 
sharedResources = RocksDBOperationUtils
+                               
.allocateSharedCachesIfConfigured(memoryConfiguration, env.getMemoryManager(), 
LOG);
 
-                       Cache blockCache = 
checkNotNull(rocksDBSharedObjects.getCache());
-                       
dbOptions.setWriteBufferManager(checkNotNull(rocksDBSharedObjects.getWriteBufferManager()));
+               if (sharedResources != null) {
+                       LOG.info("Obtained shared RocksDB cache of size {} 
bytes", sharedResources.getSize());
+
+                       final RocksDBSharedResources rocksResources = 
sharedResources.getResourceHandle();
+                       final Cache blockCache = rocksResources.getCache();
+
+                       
dbOptions.setWriteBufferManager(rocksResources.getWriteBufferManager());
 
                        createColumnOptions = stateName -> {
                                ColumnFamilyOptions columnOptions = 
getColumnOptions();
@@ -614,7 +554,8 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                        stateHandles,
                        keyGroupCompressionDecorator,
                        cancelStreamRegistry
-               
).setEnableIncrementalCheckpointing(isIncrementalCheckpointsEnabled())
+               )
+                       
.setEnableIncrementalCheckpointing(isIncrementalCheckpointsEnabled())
                        
.setEnableTtlCompactionFilter(isTtlCompactionFilterEnabled())
                        
.setNumberOfTransferingThreads(getNumberOfTransferThreads())
                        .setNativeMetricOptions(getMemoryWatcherOptions());
@@ -690,6 +631,13 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
        // 
------------------------------------------------------------------------
 
        /**
+        * Gets the memory configuration object, which offers settings to 
control RocksDB's memory usage.
+        */
+       public RocksDBMemoryConfiguration getMemoryConfiguration() {
+               return memoryConfiguration;
+       }
+
+       /**
         * Sets the path where the RocksDB local database files should be 
stored on the local
         * file system. Setting this path overrides the default behavior, where 
the
         * files are stored across the configured temp directories.
@@ -808,13 +756,6 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
        }
 
        /**
-        * Gets whether bounding memory is enabled for this state backend.
-        */
-       public boolean isBoundedMemoryEnabled() {
-               return totalMemoryPerSlot > 0;
-       }
-
-       /**
         * Enable compaction filter to cleanup state with TTL is enabled.
         *
         * <p>Note: User can still decide in state TTL configuration in state 
descriptor
@@ -963,34 +904,6 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                setNumberOfTransferThreads(numberOfTransferingThreads);
        }
 
-       public void setTotalMemoryPerSlot(String totalMemoryPerSlotStr) {
-               long totalMemoryPerSlot = 
MemorySize.parseBytes(totalMemoryPerSlotStr);
-               Preconditions.checkArgument(totalMemoryPerSlot > 0, 
String.format("Illegal total memory per slot %s for RocksDBs.", 
totalMemoryPerSlot));
-               this.totalMemoryPerSlot = totalMemoryPerSlot;
-       }
-
-       public long getTotalMemoryPerSlot() {
-               return totalMemoryPerSlot;
-       }
-
-       public void setWriteBufferRatio(double writeBufferRatio) {
-               Preconditions.checkArgument(writeBufferRatio > 0 && 
writeBufferRatio < 1.0, String.format("Illegal write buffer ratio %s for 
RocksDBs.", writeBufferRatio));
-               this.writeBufferRatio = writeBufferRatio;
-       }
-
-       public double getWriteBufferRatio() {
-               return writeBufferRatio;
-       }
-
-       public void setHighPriPoolRatio(double highPriPoolRatio) {
-               Preconditions.checkArgument(highPriPoolRatio > 0 && 
highPriPoolRatio < 1.0, String.format("Illegal high priority pool ratio %s for 
RocksDBs.", highPriPoolRatio));
-               this.highPriPoolRatio = highPriPoolRatio;
-       }
-
-       public double getHighPriPoolRatio() {
-               return highPriPoolRatio;
-       }
-
        // 
------------------------------------------------------------------------
        //  utilities
        // 
------------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendBoundedMemoryTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendBoundedMemoryTest.java
deleted file mode 100644
index c564eb4..0000000
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendBoundedMemoryTest.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.CloseableRegistry;
-import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.memory.MemoryManagerBuilder;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.rocksdb.Cache;
-import org.rocksdb.WriteBufferManager;
-
-import java.io.IOException;
-import java.util.Collections;
-
-import static org.junit.Assert.fail;
-
-/**
- * Tests to verify memory bounded for rocksDB state backend.
- */
-public class RocksDBStateBackendBoundedMemoryTest {
-
-       private static final int MEMORY_SIZE = 64 * 1024 * 1024; // 64 MiBytes
-
-       private MemoryManager memoryManager;
-
-       @Rule
-       public final TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void setUp() {
-               this.memoryManager = MemoryManagerBuilder
-                       .newBuilder()
-                       .setMemorySize(MemoryType.OFF_HEAP, MEMORY_SIZE)
-                       .build();
-       }
-
-       @After
-       public void shutDown() {
-               this.memoryManager.shutdown();
-       }
-
-       @Test
-       public void testSharedObjectsInitializeOnlyOnce() throws IOException {
-               DummyEnvironment env = new DummyEnvironment();
-               env.setMemoryManager(memoryManager);
-
-               Assert.assertNull(memoryManager.getStateBackendSharedObject());
-
-               Configuration configuration = new Configuration();
-               configuration.setString(RocksDBOptions.BOUNDED_MEMORY_SIZE, 
"128MB");
-               RocksDBStateBackend originalStateBackend = new 
RocksDBStateBackend(tempFolder.newFolder().toURI());
-               
originalStateBackend.setDbStoragePath(tempFolder.newFolder().getAbsolutePath());
-
-               RocksDBStateBackend rocksDBStateBackend1 = 
originalStateBackend.configure(configuration, 
Thread.currentThread().getContextClassLoader());
-               AbstractKeyedStateBackend keyedStateBackend1 = 
createKeyedStateBackend(rocksDBStateBackend1, env);
-
-               Assert.assertTrue("The shared object should be a 
RocksDBSharedObject instance but actually not",
-                       memoryManager.getStateBackendSharedObject() instanceof 
RocksDBSharedObjects);
-               RocksDBSharedObjects sharedObjects = (RocksDBSharedObjects) 
memoryManager.getStateBackendSharedObject();
-
-               Cache lruCache = sharedObjects.getCache();
-               WriteBufferManager writeBufferManager = 
sharedObjects.getWriteBufferManager();
-
-               RocksDBStateBackend rocksDBStateBackend2 = 
originalStateBackend.configure(configuration, 
Thread.currentThread().getContextClassLoader());
-               AbstractKeyedStateBackend keyedStateBackend2 = 
createKeyedStateBackend(rocksDBStateBackend2, env);
-
-               // Another keyed state backend is created but only initialized 
once for cache and write buffer manager.
-               sharedObjects = (RocksDBSharedObjects) 
memoryManager.getStateBackendSharedObject();
-
-               try {
-                       Assert.assertEquals(lruCache, sharedObjects.getCache());
-                       Assert.assertEquals(writeBufferManager, 
sharedObjects.getWriteBufferManager());
-               } finally {
-                       keyedStateBackend1.close();
-                       keyedStateBackend2.close();
-               }
-       }
-
-       @Test
-       public void testSharedObjectsNotClosedAfterKeyedStateBackendClosed() 
throws IOException {
-               DummyEnvironment env = new DummyEnvironment();
-               env.setMemoryManager(memoryManager);
-               Assert.assertNull(memoryManager.getStateBackendSharedObject());
-
-               Configuration configuration = new Configuration();
-               configuration.setString(RocksDBOptions.BOUNDED_MEMORY_SIZE, 
"128MB");
-               RocksDBStateBackend originalStateBackend = new 
RocksDBStateBackend(tempFolder.newFolder().toURI());
-               
originalStateBackend.setDbStoragePath(tempFolder.newFolder().getAbsolutePath());
-               RocksDBStateBackend rocksDBStateBackend = 
originalStateBackend.configure(configuration, 
Thread.currentThread().getContextClassLoader());
-               AbstractKeyedStateBackend keyedStateBackend = 
createKeyedStateBackend(rocksDBStateBackend, env);
-
-               Assert.assertTrue("The shared object should be a 
RocksDBSharedObject instance but actually not",
-                       memoryManager.getStateBackendSharedObject() instanceof 
RocksDBSharedObjects);
-
-               RocksDBSharedObjects stateBackendSharedObject = 
(RocksDBSharedObjects) memoryManager.getStateBackendSharedObject();
-               
Assert.assertTrue(stateBackendSharedObject.getCache().isOwningHandle());
-               
Assert.assertTrue(stateBackendSharedObject.getWriteBufferManager().isOwningHandle());
-
-               keyedStateBackend.close();
-               // even keyed state backend closed, cache and write buffer 
manager would not be disposed.
-               
Assert.assertTrue(stateBackendSharedObject.getCache().isOwningHandle());
-               
Assert.assertTrue(stateBackendSharedObject.getWriteBufferManager().isOwningHandle());
-       }
-
-       @Test
-       public void testSharedObjectsInitializedConcurrently() throws 
IOException {
-               DummyEnvironment env = new DummyEnvironment();
-               env.setMemoryManager(memoryManager);
-
-               Assert.assertNull(memoryManager.getStateBackendSharedObject());
-
-               Configuration configuration = new Configuration();
-               configuration.setString(RocksDBOptions.BOUNDED_MEMORY_SIZE, 
"128MB");
-               RocksDBStateBackend originalStateBackend = new 
RocksDBStateBackend(tempFolder.newFolder().toURI());
-               
originalStateBackend.setDbStoragePath(tempFolder.newFolder().getAbsolutePath());
-
-               int numThreads = 4;
-               Thread[] threads = new Thread[numThreads];
-               RocksDBKeyedStateBackend[] keyedStateBackends = new 
RocksDBKeyedStateBackend[numThreads];
-               for (int i = 0; i < numThreads; i++) {
-                       int index = i;
-                       threads[index] = new Thread(() -> {
-                               RocksDBStateBackend rocksDBStateBackend = 
originalStateBackend.configure(configuration, 
Thread.currentThread().getContextClassLoader());
-                               try {
-                                       keyedStateBackends[index] = 
(RocksDBKeyedStateBackend) createKeyedStateBackend(rocksDBStateBackend, env);
-                               } catch (IOException ignored) {
-                               }
-                       });
-                       threads[i].setDaemon(true);
-               }
-
-               // launch concurrently
-               for (int i = 0; i < numThreads; i++) {
-                       threads[i].start();
-               }
-
-               try {
-                       // sync
-                       for (int i = 0; i < numThreads; i++) {
-                               threads[i].join(5000);
-                       }
-               } catch (InterruptedException e) {
-                       fail(e.getMessage());
-               }
-
-               WriteBufferManager writeBufferManager = null;
-               for (RocksDBKeyedStateBackend keyedStateBackend : 
keyedStateBackends) {
-                       if (writeBufferManager == null) {
-                               writeBufferManager = 
keyedStateBackend.getDbOptions().writeBufferManager();
-                       } else {
-                               // different keyed state backends but share the 
same write buffer manager
-                               Assert.assertEquals(writeBufferManager, 
keyedStateBackend.getDbOptions().writeBufferManager());
-                       }
-                       keyedStateBackend.close();
-               }
-       }
-
-       private AbstractKeyedStateBackend 
createKeyedStateBackend(RocksDBStateBackend rocksDBStateBackend, Environment 
env) throws IOException {
-               return rocksDBStateBackend.createKeyedStateBackend(
-                       env,
-                       env.getJobID(),
-                       "test_op",
-                       IntSerializer.INSTANCE,
-                       1,
-                       new KeyGroupRange(0, 0),
-                       env.getTaskKvStateRegistry(),
-                       TtlTimeProvider.DEFAULT,
-                       new UnregisteredMetricsGroup(),
-                       Collections.emptyList(),
-                       new CloseableRegistry());
-
-       }
-}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 9cd338f..ad52f18 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -64,6 +64,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.startsWith;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -637,36 +638,35 @@ public class RocksDBStateBackendConfigTest {
 
        @Test
        public void testDefaultMemoryControlParameters() {
-               StateBackend storageBackend = new MemoryStateBackend();
-               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(storageBackend);
-               assertEquals(RocksDBStateBackend.UNDEFINED_VALUE, 
rocksDbBackend.getTotalMemoryPerSlot());
-               assertEquals(RocksDBStateBackend.UNDEFINED_VALUE, 
rocksDbBackend.getHighPriPoolRatio(), 0.0);
-               assertEquals(RocksDBStateBackend.UNDEFINED_VALUE, 
rocksDbBackend.getWriteBufferRatio(), 0.0);
-
-               RocksDBStateBackend configure = rocksDbBackend.configure(new 
Configuration(), Thread.currentThread().getContextClassLoader());
-               assertEquals(RocksDBStateBackend.UNDEFINED_VALUE, 
configure.getTotalMemoryPerSlot());
-               assertEquals(RocksDBOptions.HIGH_PRI_POOL_RATIO.defaultValue(), 
configure.getHighPriPoolRatio(), 0.0);
-               assertEquals(RocksDBOptions.WRITE_BUFFER_RATIO.defaultValue(), 
configure.getWriteBufferRatio(), 0.0);
+               RocksDBMemoryConfiguration memSettings = new 
RocksDBMemoryConfiguration();
+               assertFalse(memSettings.isUsingFixedMemoryPerSlot());
+               
assertEquals(RocksDBOptions.HIGH_PRIORITY_POOL_RATIO.defaultValue(), 
memSettings.getHighPriorityPoolRatio(), 0.0);
+               assertEquals(RocksDBOptions.WRITE_BUFFER_RATIO.defaultValue(), 
memSettings.getWriteBufferRatio(), 0.0);
+
+               RocksDBMemoryConfiguration configured = 
RocksDBMemoryConfiguration.fromOtherAndConfiguration(memSettings, new 
Configuration());
+               assertFalse(configured.isUsingFixedMemoryPerSlot());
+               
assertEquals(RocksDBOptions.HIGH_PRIORITY_POOL_RATIO.defaultValue(), 
configured.getHighPriorityPoolRatio(), 0.0);
+               assertEquals(RocksDBOptions.WRITE_BUFFER_RATIO.defaultValue(), 
configured.getWriteBufferRatio(), 0.0);
        }
 
        @Test
        public void testConfigureIllegalMemoryControlParameters() {
-               StateBackend storageBackend = new MemoryStateBackend();
-               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(storageBackend);
+               RocksDBMemoryConfiguration memSettings = new 
RocksDBMemoryConfiguration();
+
+               verifySetParameter(() -> 
memSettings.setFixedMemoryPerSlot("-1B"));
+               verifySetParameter(() -> 
memSettings.setHighPriorityPoolRatio(-0.1));
+               verifySetParameter(() -> 
memSettings.setHighPriorityPoolRatio(1.1));
+               verifySetParameter(() -> memSettings.setWriteBufferRatio(-0.1));
+               verifySetParameter(() -> memSettings.setWriteBufferRatio(1.1));
 
-               verifySetParameter(() -> 
rocksDbBackend.setTotalMemoryPerSlot("-1B"));
-               verifySetParameter(() -> 
rocksDbBackend.setHighPriPoolRatio(-0.1));
-               verifySetParameter(() -> 
rocksDbBackend.setHighPriPoolRatio(1.1));
-               verifySetParameter(() -> 
rocksDbBackend.setWriteBufferRatio(-0.1));
-               verifySetParameter(() -> 
rocksDbBackend.setWriteBufferRatio(1.1));
+               memSettings.setFixedMemoryPerSlot("128MB");
+               memSettings.setWriteBufferRatio(0.6);
+               memSettings.setHighPriorityPoolRatio(0.6);
 
-               rocksDbBackend.setTotalMemoryPerSlot("128MB");
-               rocksDbBackend.setWriteBufferRatio(0.6);
-               rocksDbBackend.setHighPriPoolRatio(0.6);
                try {
                        // sum of writeBufferRatio and highPriPoolRatio larger 
than 1.0
-                       rocksDbBackend.configure(new Configuration(), 
Thread.currentThread().getContextClassLoader());
-                       fail("No expected IllegalArgumentException.");
+                       memSettings.validate();
+                       fail("Expected an IllegalArgumentException.");
                } catch (IllegalArgumentException expected) {
                        // expected exception
                }

Reply via email to