This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 112e86558df345c533322021823fb3d01f225854
Author: Yun Tang <myas...@live.com>
AuthorDate: Wed Dec 4 15:41:18 2019 +0800

    [FLINK-14484][state-backend] Control memory usage of RocksDB via Cache and 
WriteBufferManager
    
    This closes #10416
---
 .../streaming/state/RocksDBKeyedStateBackend.java  |   5 +
 .../contrib/streaming/state/RocksDBOptions.java    |  20 ++
 .../streaming/state/RocksDBSharedObjects.java      |  50 +++++
 .../streaming/state/RocksDBStateBackend.java       | 142 +++++++++++++-
 .../RocksDBStateBackendBoundedMemoryTest.java      | 204 +++++++++++++++++++++
 .../state/RocksDBStateBackendConfigTest.java       |  50 +++++
 6 files changed, 469 insertions(+), 2 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 16c8757..07b1fd8 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -403,6 +403,11 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        }
 
        @VisibleForTesting
+       DBOptions getDbOptions() {
+               return dbOptions;
+       }
+
+       @VisibleForTesting
        boolean isDisposed() {
                return this.disposed;
        }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
index 8ffd142..928882f 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
@@ -86,4 +86,24 @@ public class RocksDBOptions {
                                "The default options factory is %s, and it 
would read the configured options which provided in 
'RocksDBConfigurableOptions'.",
                                
DefaultConfigurableOptionsFactory.class.getName()));
 
+       public static final ConfigOption<String> BOUNDED_MEMORY_SIZE = 
ConfigOptions
+               .key("state.backend.rocksdb.per-slot.total.memory")
+               .stringType()
+               .noDefaultValue()
+               .withDescription("The total memory size shared among all 
RocksDB instances per slot. " +
+                       "This option has no default value which means no 
bounded memory limit.");
+
+       public static final ConfigOption<Double> WRITE_BUFFER_RATIO = 
ConfigOptions
+               .key("state.backend.rocksdb.write-buffer.ratio")
+               .doubleType()
+               .defaultValue(0.5)
+               .withDescription(String.format("This option would only take 
effect when %s is configured, " +
+                       "all RocksDB instances would share the same write 
buffer manager with the ratio of a LRUCache.", BOUNDED_MEMORY_SIZE));
+
+       public static final ConfigOption<Double> HIGH_PRI_POOL_RATIO = 
ConfigOptions
+               .key("state.backend.rocksdb.high-pri-pool.ratio")
+               .doubleType()
+               .defaultValue(0.1)
+               .withDescription(String.format("This option would only take 
effect when %s is configured, " +
+                       "all RocksDB instances would share the high priority 
ratio for index, filter, and compression dictionary blocks in a LRUCache.", 
BOUNDED_MEMORY_SIZE));
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedObjects.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedObjects.java
new file mode 100644
index 0000000..188c506
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedObjects.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.rocksdb.Cache;
+import org.rocksdb.WriteBufferManager;
+
+/**
+ * Shared objects among RocksDB instances per slot.
+ */
+public class RocksDBSharedObjects implements AutoCloseable {
+
+       private final Cache cache;
+       private final WriteBufferManager writeBufferManager;
+
+       RocksDBSharedObjects(Cache cache, WriteBufferManager 
writeBufferManager) {
+               this.cache = cache;
+               this.writeBufferManager = writeBufferManager;
+       }
+
+       @Override
+       public void close() {
+               writeBufferManager.close();
+               cache.close();
+       }
+
+       public Cache getCache() {
+               return cache;
+       }
+
+       public WriteBufferManager getWriteBufferManager() {
+               return writeBufferManager;
+       }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 8563895..90e8583 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -24,10 +24,12 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -50,10 +52,15 @@ import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TernaryBoolean;
 
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.Cache;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.LRUCache;
 import org.rocksdb.NativeLibraryLoader;
 import org.rocksdb.RocksDB;
+import org.rocksdb.TableFormatConfig;
+import org.rocksdb.WriteBufferManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,10 +77,15 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
+import java.util.function.Function;
+import java.util.function.Supplier;
 
+import static 
org.apache.flink.contrib.streaming.state.RocksDBOptions.BOUNDED_MEMORY_SIZE;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM;
+import static 
org.apache.flink.contrib.streaming.state.RocksDBOptions.HIGH_PRI_POOL_RATIO;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBOptions.TIMER_SERVICE_FACTORY;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBOptions.TTL_COMPACT_FILTER_ENABLED;
+import static 
org.apache.flink.contrib.streaming.state.RocksDBOptions.WRITE_BUFFER_RATIO;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -111,6 +123,8 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
 
        private static final int UNDEFINED_NUMBER_OF_TRANSFER_THREADS = -1;
 
+       static final long UNDEFINED_VALUE = -1;
+
        // 
------------------------------------------------------------------------
 
        // -- configuration values, set in the application / configuration
@@ -146,6 +160,18 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
         */
        private TernaryBoolean enableTtlCompactionFilter;
 
+       /**
+        * Total memory for all rocksDB instances at this slot.
+        * Currently, we would use a LRU cache to serve as the total memory 
usage.
+        */
+       private long totalMemoryPerSlot;
+
+       /** The write buffer ratio which consumed by write-buffer manager from 
the shared cache. */
+       private double writeBufferRatio;
+
+       /** The high priority pool ratio which consumed by index&filter from 
the shared cache. */
+       private double highPriPoolRatio;
+
        /** This determines the type of priority queue state. */
        private final PriorityQueueStateType priorityQueueStateType;
 
@@ -267,6 +293,9 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                this.priorityQueueStateType = PriorityQueueStateType.HEAP;
                this.defaultMetricOptions = new RocksDBNativeMetricOptions();
                this.enableTtlCompactionFilter = TernaryBoolean.UNDEFINED;
+               this.totalMemoryPerSlot = UNDEFINED_VALUE;
+               this.writeBufferRatio = UNDEFINED_VALUE;
+               this.highPriPoolRatio = UNDEFINED_VALUE;
        }
 
        /**
@@ -312,6 +341,33 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                this.enableTtlCompactionFilter = 
original.enableTtlCompactionFilter
                        
.resolveUndefined(config.getBoolean(TTL_COMPACT_FILTER_ENABLED));
 
+               configureBoundedMemory(
+                       () -> (double) original.getTotalMemoryPerSlot(),
+                       () -> {
+                               if (config.getString(BOUNDED_MEMORY_SIZE) != 
null) {
+                                       
setTotalMemoryPerSlot(config.getString(BOUNDED_MEMORY_SIZE));
+                               } else {
+                                       // we still left total memory size per 
slot as undefined if no actual settings.
+                                       this.totalMemoryPerSlot = 
UNDEFINED_VALUE;
+                               }
+                       },
+                       () -> 
setTotalMemoryPerSlot(String.valueOf(original.totalMemoryPerSlot)));
+
+               configureBoundedMemory(
+                       original::getWriteBufferRatio,
+                       () -> 
setWriteBufferRatio(config.getDouble(WRITE_BUFFER_RATIO)),
+                       () -> setWriteBufferRatio(original.writeBufferRatio));
+
+               configureBoundedMemory(
+                       original::getHighPriPoolRatio,
+                       () -> 
setHighPriPoolRatio(config.getDouble(HIGH_PRI_POOL_RATIO)),
+                       () -> setHighPriPoolRatio(original.highPriPoolRatio));
+
+               if (isBoundedMemoryEnabled()) {
+                       Preconditions.checkArgument(this.writeBufferRatio + 
this.highPriPoolRatio < 1.0,
+                               String.format("Illegal sum of writeBufferRatio 
%s and highPriPoolRatio %s, should be less than 1.0", this.writeBufferRatio, 
this.highPriPoolRatio));
+               }
+
                final String priorityQueueTypeString = 
config.getString(TIMER_SERVICE_FACTORY);
 
                this.priorityQueueStateType = priorityQueueTypeString.length() 
> 0 ?
@@ -356,6 +412,14 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                }
        }
 
+       private void configureBoundedMemory(Supplier<Double> getOriginalValue, 
Runnable parseFromConfig, Runnable setFromOriginal) {
+               if (getOriginalValue.get() == UNDEFINED_VALUE) {
+                       parseFromConfig.run();
+               } else {
+                       setFromOriginal.run();
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  Reconfiguration
        // 
------------------------------------------------------------------------
@@ -491,14 +555,53 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                LocalRecoveryConfig localRecoveryConfig =
                        env.getTaskStateManager().createLocalRecoveryConfig();
 
+               DBOptions dbOptions = getDbOptions();
+               Function<String, ColumnFamilyOptions> createColumnOptions;
+
+               if (isBoundedMemoryEnabled()) {
+                       RocksDBSharedObjects rocksDBSharedObjects;
+                       MemoryManager memoryManager = env.getMemoryManager();
+                       // only initialized LRUCache and write buffer manager 
once.
+                       // we would not dispose the cache and write buffer 
manager during disposing state backend
+                       // as the same objects are shared by every RocksDB 
instance per slot.
+                       while ((rocksDBSharedObjects = (RocksDBSharedObjects) 
memoryManager.getStateBackendSharedObject()) == null) {
+                               if 
(memoryManager.getLazyInitializeSharedObjectHelper().compareAndSet(false, 
true)) {
+                                       Cache lruCache = new 
LRUCache(getTotalMemoryPerSlot(), -1, false, getHighPriPoolRatio());
+                                       WriteBufferManager writeBufferManager = 
new WriteBufferManager((long) (getTotalMemoryPerSlot() * 
getWriteBufferRatio()), lruCache);
+
+                                       rocksDBSharedObjects = new 
RocksDBSharedObjects(lruCache, writeBufferManager);
+                                       
memoryManager.setStateBackendSharedObject(rocksDBSharedObjects);
+                               }
+                       }
+
+                       Cache blockCache = 
checkNotNull(rocksDBSharedObjects.getCache());
+                       
dbOptions.setWriteBufferManager(checkNotNull(rocksDBSharedObjects.getWriteBufferManager()));
+
+                       createColumnOptions = stateName -> {
+                               ColumnFamilyOptions columnOptions = 
getColumnOptions();
+                               TableFormatConfig tableFormatConfig = 
columnOptions.tableFormatConfig();
+                               Preconditions.checkArgument(tableFormatConfig 
instanceof BlockBasedTableConfig,
+                                       "We currently only support 
BlockBasedTableConfig When bounding total memory.");
+                               BlockBasedTableConfig blockBasedTableConfig = 
(BlockBasedTableConfig) tableFormatConfig;
+                               blockBasedTableConfig.setBlockCache(blockCache);
+                               
blockBasedTableConfig.setCacheIndexAndFilterBlocks(true);
+                               
blockBasedTableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);
+                               
blockBasedTableConfig.setPinL0FilterAndIndexBlocksInCache(true);
+                               
columnOptions.setTableFormatConfig(blockBasedTableConfig);
+                               return columnOptions;
+                       };
+               } else {
+                       createColumnOptions = stateName -> getColumnOptions();
+               }
+
                ExecutionConfig executionConfig = env.getExecutionConfig();
                StreamCompressionDecorator keyGroupCompressionDecorator = 
getCompressionDecorator(executionConfig);
                RocksDBKeyedStateBackendBuilder<K> builder = new 
RocksDBKeyedStateBackendBuilder<>(
                        operatorIdentifier,
                        env.getUserClassLoader(),
                        instanceBasePath,
-                       getDbOptions(),
-                       stateName -> getColumnOptions(),
+                       dbOptions,
+                       createColumnOptions,
                        kvStateRegistry,
                        keySerializer,
                        numberOfKeyGroups,
@@ -705,6 +808,13 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
        }
 
        /**
+        * Gets whether bounding memory is enabled for this state backend.
+        */
+       public boolean isBoundedMemoryEnabled() {
+               return totalMemoryPerSlot > 0;
+       }
+
+       /**
         * Enable compaction filter to cleanup state with TTL is enabled.
         *
         * <p>Note: User can still decide in state TTL configuration in state 
descriptor
@@ -853,6 +963,34 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                setNumberOfTransferThreads(numberOfTransferingThreads);
        }
 
+       public void setTotalMemoryPerSlot(String totalMemoryPerSlotStr) {
+               long totalMemoryPerSlot = 
MemorySize.parseBytes(totalMemoryPerSlotStr);
+               Preconditions.checkArgument(totalMemoryPerSlot > 0, 
String.format("Illegal total memory per slot %s for RocksDBs.", 
totalMemoryPerSlot));
+               this.totalMemoryPerSlot = totalMemoryPerSlot;
+       }
+
+       public long getTotalMemoryPerSlot() {
+               return totalMemoryPerSlot;
+       }
+
+       public void setWriteBufferRatio(double writeBufferRatio) {
+               Preconditions.checkArgument(writeBufferRatio > 0 && 
writeBufferRatio < 1.0, String.format("Illegal write buffer ratio %s for 
RocksDBs.", writeBufferRatio));
+               this.writeBufferRatio = writeBufferRatio;
+       }
+
+       public double getWriteBufferRatio() {
+               return writeBufferRatio;
+       }
+
+       public void setHighPriPoolRatio(double highPriPoolRatio) {
+               Preconditions.checkArgument(highPriPoolRatio > 0 && 
highPriPoolRatio < 1.0, String.format("Illegal high priority pool ratio %s for 
RocksDBs.", highPriPoolRatio));
+               this.highPriPoolRatio = highPriPoolRatio;
+       }
+
+       public double getHighPriPoolRatio() {
+               return highPriPoolRatio;
+       }
+
        // 
------------------------------------------------------------------------
        //  utilities
        // 
------------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendBoundedMemoryTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendBoundedMemoryTest.java
new file mode 100644
index 0000000..c564eb4
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendBoundedMemoryTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.memory.MemoryManagerBuilder;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.Cache;
+import org.rocksdb.WriteBufferManager;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Tests to verify memory bounded for rocksDB state backend.
+ */
+public class RocksDBStateBackendBoundedMemoryTest {
+
+       private static final int MEMORY_SIZE = 64 * 1024 * 1024; // 64 MiBytes
+
+       private MemoryManager memoryManager;
+
+       @Rule
+       public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void setUp() {
+               this.memoryManager = MemoryManagerBuilder
+                       .newBuilder()
+                       .setMemorySize(MemoryType.OFF_HEAP, MEMORY_SIZE)
+                       .build();
+       }
+
+       @After
+       public void shutDown() {
+               this.memoryManager.shutdown();
+       }
+
+       @Test
+       public void testSharedObjectsInitializeOnlyOnce() throws IOException {
+               DummyEnvironment env = new DummyEnvironment();
+               env.setMemoryManager(memoryManager);
+
+               Assert.assertNull(memoryManager.getStateBackendSharedObject());
+
+               Configuration configuration = new Configuration();
+               configuration.setString(RocksDBOptions.BOUNDED_MEMORY_SIZE, 
"128MB");
+               RocksDBStateBackend originalStateBackend = new 
RocksDBStateBackend(tempFolder.newFolder().toURI());
+               
originalStateBackend.setDbStoragePath(tempFolder.newFolder().getAbsolutePath());
+
+               RocksDBStateBackend rocksDBStateBackend1 = 
originalStateBackend.configure(configuration, 
Thread.currentThread().getContextClassLoader());
+               AbstractKeyedStateBackend keyedStateBackend1 = 
createKeyedStateBackend(rocksDBStateBackend1, env);
+
+               Assert.assertTrue("The shared object should be a 
RocksDBSharedObject instance but actually not",
+                       memoryManager.getStateBackendSharedObject() instanceof 
RocksDBSharedObjects);
+               RocksDBSharedObjects sharedObjects = (RocksDBSharedObjects) 
memoryManager.getStateBackendSharedObject();
+
+               Cache lruCache = sharedObjects.getCache();
+               WriteBufferManager writeBufferManager = 
sharedObjects.getWriteBufferManager();
+
+               RocksDBStateBackend rocksDBStateBackend2 = 
originalStateBackend.configure(configuration, 
Thread.currentThread().getContextClassLoader());
+               AbstractKeyedStateBackend keyedStateBackend2 = 
createKeyedStateBackend(rocksDBStateBackend2, env);
+
+               // Another keyed state backend is created but only initialized 
once for cache and write buffer manager.
+               sharedObjects = (RocksDBSharedObjects) 
memoryManager.getStateBackendSharedObject();
+
+               try {
+                       Assert.assertEquals(lruCache, sharedObjects.getCache());
+                       Assert.assertEquals(writeBufferManager, 
sharedObjects.getWriteBufferManager());
+               } finally {
+                       keyedStateBackend1.close();
+                       keyedStateBackend2.close();
+               }
+       }
+
+       @Test
+       public void testSharedObjectsNotClosedAfterKeyedStateBackendClosed() 
throws IOException {
+               DummyEnvironment env = new DummyEnvironment();
+               env.setMemoryManager(memoryManager);
+               Assert.assertNull(memoryManager.getStateBackendSharedObject());
+
+               Configuration configuration = new Configuration();
+               configuration.setString(RocksDBOptions.BOUNDED_MEMORY_SIZE, 
"128MB");
+               RocksDBStateBackend originalStateBackend = new 
RocksDBStateBackend(tempFolder.newFolder().toURI());
+               
originalStateBackend.setDbStoragePath(tempFolder.newFolder().getAbsolutePath());
+               RocksDBStateBackend rocksDBStateBackend = 
originalStateBackend.configure(configuration, 
Thread.currentThread().getContextClassLoader());
+               AbstractKeyedStateBackend keyedStateBackend = 
createKeyedStateBackend(rocksDBStateBackend, env);
+
+               Assert.assertTrue("The shared object should be a 
RocksDBSharedObject instance but actually not",
+                       memoryManager.getStateBackendSharedObject() instanceof 
RocksDBSharedObjects);
+
+               RocksDBSharedObjects stateBackendSharedObject = 
(RocksDBSharedObjects) memoryManager.getStateBackendSharedObject();
+               
Assert.assertTrue(stateBackendSharedObject.getCache().isOwningHandle());
+               
Assert.assertTrue(stateBackendSharedObject.getWriteBufferManager().isOwningHandle());
+
+               keyedStateBackend.close();
+               // even keyed state backend closed, cache and write buffer 
manager would not be disposed.
+               
Assert.assertTrue(stateBackendSharedObject.getCache().isOwningHandle());
+               
Assert.assertTrue(stateBackendSharedObject.getWriteBufferManager().isOwningHandle());
+       }
+
+       @Test
+       public void testSharedObjectsInitializedConcurrently() throws 
IOException {
+               DummyEnvironment env = new DummyEnvironment();
+               env.setMemoryManager(memoryManager);
+
+               Assert.assertNull(memoryManager.getStateBackendSharedObject());
+
+               Configuration configuration = new Configuration();
+               configuration.setString(RocksDBOptions.BOUNDED_MEMORY_SIZE, 
"128MB");
+               RocksDBStateBackend originalStateBackend = new 
RocksDBStateBackend(tempFolder.newFolder().toURI());
+               
originalStateBackend.setDbStoragePath(tempFolder.newFolder().getAbsolutePath());
+
+               int numThreads = 4;
+               Thread[] threads = new Thread[numThreads];
+               RocksDBKeyedStateBackend[] keyedStateBackends = new 
RocksDBKeyedStateBackend[numThreads];
+               for (int i = 0; i < numThreads; i++) {
+                       int index = i;
+                       threads[index] = new Thread(() -> {
+                               RocksDBStateBackend rocksDBStateBackend = 
originalStateBackend.configure(configuration, 
Thread.currentThread().getContextClassLoader());
+                               try {
+                                       keyedStateBackends[index] = 
(RocksDBKeyedStateBackend) createKeyedStateBackend(rocksDBStateBackend, env);
+                               } catch (IOException ignored) {
+                               }
+                       });
+                       threads[i].setDaemon(true);
+               }
+
+               // launch concurrently
+               for (int i = 0; i < numThreads; i++) {
+                       threads[i].start();
+               }
+
+               try {
+                       // sync
+                       for (int i = 0; i < numThreads; i++) {
+                               threads[i].join(5000);
+                       }
+               } catch (InterruptedException e) {
+                       fail(e.getMessage());
+               }
+
+               WriteBufferManager writeBufferManager = null;
+               for (RocksDBKeyedStateBackend keyedStateBackend : 
keyedStateBackends) {
+                       if (writeBufferManager == null) {
+                               writeBufferManager = 
keyedStateBackend.getDbOptions().writeBufferManager();
+                       } else {
+                               // different keyed state backends but share the 
same write buffer manager
+                               Assert.assertEquals(writeBufferManager, 
keyedStateBackend.getDbOptions().writeBufferManager());
+                       }
+                       keyedStateBackend.close();
+               }
+       }
+
+       private AbstractKeyedStateBackend 
createKeyedStateBackend(RocksDBStateBackend rocksDBStateBackend, Environment 
env) throws IOException {
+               return rocksDBStateBackend.createKeyedStateBackend(
+                       env,
+                       env.getJobID(),
+                       "test_op",
+                       IntSerializer.INSTANCE,
+                       1,
+                       new KeyGroupRange(0, 0),
+                       env.getTaskKvStateRegistry(),
+                       TtlTimeProvider.DEFAULT,
+                       new UnregisteredMetricsGroup(),
+                       Collections.emptyList(),
+                       new CloseableRegistry());
+
+       }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index c00ffdb..9cd338f 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -632,6 +632,56 @@ public class RocksDBStateBackendConfigTest {
        }
 
        // 
------------------------------------------------------------------------
+       //  RocksDB Memory Control
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testDefaultMemoryControlParameters() {
+               StateBackend storageBackend = new MemoryStateBackend();
+               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(storageBackend);
+               assertEquals(RocksDBStateBackend.UNDEFINED_VALUE, 
rocksDbBackend.getTotalMemoryPerSlot());
+               assertEquals(RocksDBStateBackend.UNDEFINED_VALUE, 
rocksDbBackend.getHighPriPoolRatio(), 0.0);
+               assertEquals(RocksDBStateBackend.UNDEFINED_VALUE, 
rocksDbBackend.getWriteBufferRatio(), 0.0);
+
+               RocksDBStateBackend configure = rocksDbBackend.configure(new 
Configuration(), Thread.currentThread().getContextClassLoader());
+               assertEquals(RocksDBStateBackend.UNDEFINED_VALUE, 
configure.getTotalMemoryPerSlot());
+               assertEquals(RocksDBOptions.HIGH_PRI_POOL_RATIO.defaultValue(), 
configure.getHighPriPoolRatio(), 0.0);
+               assertEquals(RocksDBOptions.WRITE_BUFFER_RATIO.defaultValue(), 
configure.getWriteBufferRatio(), 0.0);
+       }
+
+       @Test
+       public void testConfigureIllegalMemoryControlParameters() {
+               StateBackend storageBackend = new MemoryStateBackend();
+               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(storageBackend);
+
+               verifySetParameter(() -> 
rocksDbBackend.setTotalMemoryPerSlot("-1B"));
+               verifySetParameter(() -> 
rocksDbBackend.setHighPriPoolRatio(-0.1));
+               verifySetParameter(() -> 
rocksDbBackend.setHighPriPoolRatio(1.1));
+               verifySetParameter(() -> 
rocksDbBackend.setWriteBufferRatio(-0.1));
+               verifySetParameter(() -> 
rocksDbBackend.setWriteBufferRatio(1.1));
+
+               rocksDbBackend.setTotalMemoryPerSlot("128MB");
+               rocksDbBackend.setWriteBufferRatio(0.6);
+               rocksDbBackend.setHighPriPoolRatio(0.6);
+               try {
+                       // sum of writeBufferRatio and highPriPoolRatio larger 
than 1.0
+                       rocksDbBackend.configure(new Configuration(), 
Thread.currentThread().getContextClassLoader());
+                       fail("No expected IllegalArgumentException.");
+               } catch (IllegalArgumentException expected) {
+                       // expected exception
+               }
+       }
+
+       private void verifySetParameter(Runnable setter) {
+               try {
+                       setter.run();
+                       fail("No expected IllegalArgumentException.");
+               } catch (IllegalArgumentException expected) {
+                       // expected exception
+               }
+       }
+
+       // 
------------------------------------------------------------------------
        //  Contained Non-partitioned State Backend
        // 
------------------------------------------------------------------------
 

Reply via email to