This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 112e86558df345c533322021823fb3d01f225854 Author: Yun Tang <myas...@live.com> AuthorDate: Wed Dec 4 15:41:18 2019 +0800 [FLINK-14484][state-backend] Control memory usage of RocksDB via Cache and WriteBufferManager This closes #10416 --- .../streaming/state/RocksDBKeyedStateBackend.java | 5 + .../contrib/streaming/state/RocksDBOptions.java | 20 ++ .../streaming/state/RocksDBSharedObjects.java | 50 +++++ .../streaming/state/RocksDBStateBackend.java | 142 +++++++++++++- .../RocksDBStateBackendBoundedMemoryTest.java | 204 +++++++++++++++++++++ .../state/RocksDBStateBackendConfigTest.java | 50 +++++ 6 files changed, 469 insertions(+), 2 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 16c8757..07b1fd8 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -403,6 +403,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } @VisibleForTesting + DBOptions getDbOptions() { + return dbOptions; + } + + @VisibleForTesting boolean isDisposed() { return this.disposed; } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java index 8ffd142..928882f 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java @@ -86,4 +86,24 @@ public class RocksDBOptions { "The default options factory is %s, and it would read the configured options which provided in 'RocksDBConfigurableOptions'.", DefaultConfigurableOptionsFactory.class.getName())); + public static final ConfigOption<String> BOUNDED_MEMORY_SIZE = ConfigOptions + .key("state.backend.rocksdb.per-slot.total.memory") + .stringType() + .noDefaultValue() + .withDescription("The total memory size shared among all RocksDB instances per slot. " + + "This option has no default value which means no bounded memory limit."); + + public static final ConfigOption<Double> WRITE_BUFFER_RATIO = ConfigOptions + .key("state.backend.rocksdb.write-buffer.ratio") + .doubleType() + .defaultValue(0.5) + .withDescription(String.format("This option would only take effect when %s is configured, " + + "all RocksDB instances would share the same write buffer manager with the ratio of a LRUCache.", BOUNDED_MEMORY_SIZE)); + + public static final ConfigOption<Double> HIGH_PRI_POOL_RATIO = ConfigOptions + .key("state.backend.rocksdb.high-pri-pool.ratio") + .doubleType() + .defaultValue(0.1) + .withDescription(String.format("This option would only take effect when %s is configured, " + + "all RocksDB instances would share the high priority ratio for index, filter, and compression dictionary blocks in a LRUCache.", BOUNDED_MEMORY_SIZE)); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedObjects.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedObjects.java new file mode 100644 index 0000000..188c506 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedObjects.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.rocksdb.Cache; +import org.rocksdb.WriteBufferManager; + +/** + * Shared objects among RocksDB instances per slot. + */ +public class RocksDBSharedObjects implements AutoCloseable { + + private final Cache cache; + private final WriteBufferManager writeBufferManager; + + RocksDBSharedObjects(Cache cache, WriteBufferManager writeBufferManager) { + this.cache = cache; + this.writeBufferManager = writeBufferManager; + } + + @Override + public void close() { + writeBufferManager.close(); + cache.close(); + } + + public Cache getCache() { + return cache; + } + + public WriteBufferManager getWriteBufferManager() { + return writeBufferManager; + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 8563895..90e8583 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -24,10 +24,12 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.MemorySize; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.Path; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; @@ -50,10 +52,15 @@ import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TernaryBoolean; +import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.Cache; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; +import org.rocksdb.LRUCache; import org.rocksdb.NativeLibraryLoader; import org.rocksdb.RocksDB; +import org.rocksdb.TableFormatConfig; +import org.rocksdb.WriteBufferManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,10 +77,15 @@ import java.util.Collection; import java.util.List; import java.util.Random; import java.util.UUID; +import java.util.function.Function; +import java.util.function.Supplier; +import static org.apache.flink.contrib.streaming.state.RocksDBOptions.BOUNDED_MEMORY_SIZE; import static org.apache.flink.contrib.streaming.state.RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM; +import static org.apache.flink.contrib.streaming.state.RocksDBOptions.HIGH_PRI_POOL_RATIO; import static org.apache.flink.contrib.streaming.state.RocksDBOptions.TIMER_SERVICE_FACTORY; import static org.apache.flink.contrib.streaming.state.RocksDBOptions.TTL_COMPACT_FILTER_ENABLED; +import static org.apache.flink.contrib.streaming.state.RocksDBOptions.WRITE_BUFFER_RATIO; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -111,6 +123,8 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu private static final int UNDEFINED_NUMBER_OF_TRANSFER_THREADS = -1; + static final long UNDEFINED_VALUE = -1; + // ------------------------------------------------------------------------ // -- configuration values, set in the application / configuration @@ -146,6 +160,18 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu */ private TernaryBoolean enableTtlCompactionFilter; + /** + * Total memory for all rocksDB instances at this slot. + * Currently, we would use a LRU cache to serve as the total memory usage. + */ + private long totalMemoryPerSlot; + + /** The write buffer ratio which consumed by write-buffer manager from the shared cache. */ + private double writeBufferRatio; + + /** The high priority pool ratio which consumed by index&filter from the shared cache. */ + private double highPriPoolRatio; + /** This determines the type of priority queue state. */ private final PriorityQueueStateType priorityQueueStateType; @@ -267,6 +293,9 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu this.priorityQueueStateType = PriorityQueueStateType.HEAP; this.defaultMetricOptions = new RocksDBNativeMetricOptions(); this.enableTtlCompactionFilter = TernaryBoolean.UNDEFINED; + this.totalMemoryPerSlot = UNDEFINED_VALUE; + this.writeBufferRatio = UNDEFINED_VALUE; + this.highPriPoolRatio = UNDEFINED_VALUE; } /** @@ -312,6 +341,33 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu this.enableTtlCompactionFilter = original.enableTtlCompactionFilter .resolveUndefined(config.getBoolean(TTL_COMPACT_FILTER_ENABLED)); + configureBoundedMemory( + () -> (double) original.getTotalMemoryPerSlot(), + () -> { + if (config.getString(BOUNDED_MEMORY_SIZE) != null) { + setTotalMemoryPerSlot(config.getString(BOUNDED_MEMORY_SIZE)); + } else { + // we still left total memory size per slot as undefined if no actual settings. + this.totalMemoryPerSlot = UNDEFINED_VALUE; + } + }, + () -> setTotalMemoryPerSlot(String.valueOf(original.totalMemoryPerSlot))); + + configureBoundedMemory( + original::getWriteBufferRatio, + () -> setWriteBufferRatio(config.getDouble(WRITE_BUFFER_RATIO)), + () -> setWriteBufferRatio(original.writeBufferRatio)); + + configureBoundedMemory( + original::getHighPriPoolRatio, + () -> setHighPriPoolRatio(config.getDouble(HIGH_PRI_POOL_RATIO)), + () -> setHighPriPoolRatio(original.highPriPoolRatio)); + + if (isBoundedMemoryEnabled()) { + Preconditions.checkArgument(this.writeBufferRatio + this.highPriPoolRatio < 1.0, + String.format("Illegal sum of writeBufferRatio %s and highPriPoolRatio %s, should be less than 1.0", this.writeBufferRatio, this.highPriPoolRatio)); + } + final String priorityQueueTypeString = config.getString(TIMER_SERVICE_FACTORY); this.priorityQueueStateType = priorityQueueTypeString.length() > 0 ? @@ -356,6 +412,14 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu } } + private void configureBoundedMemory(Supplier<Double> getOriginalValue, Runnable parseFromConfig, Runnable setFromOriginal) { + if (getOriginalValue.get() == UNDEFINED_VALUE) { + parseFromConfig.run(); + } else { + setFromOriginal.run(); + } + } + // ------------------------------------------------------------------------ // Reconfiguration // ------------------------------------------------------------------------ @@ -491,14 +555,53 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu LocalRecoveryConfig localRecoveryConfig = env.getTaskStateManager().createLocalRecoveryConfig(); + DBOptions dbOptions = getDbOptions(); + Function<String, ColumnFamilyOptions> createColumnOptions; + + if (isBoundedMemoryEnabled()) { + RocksDBSharedObjects rocksDBSharedObjects; + MemoryManager memoryManager = env.getMemoryManager(); + // only initialized LRUCache and write buffer manager once. + // we would not dispose the cache and write buffer manager during disposing state backend + // as the same objects are shared by every RocksDB instance per slot. + while ((rocksDBSharedObjects = (RocksDBSharedObjects) memoryManager.getStateBackendSharedObject()) == null) { + if (memoryManager.getLazyInitializeSharedObjectHelper().compareAndSet(false, true)) { + Cache lruCache = new LRUCache(getTotalMemoryPerSlot(), -1, false, getHighPriPoolRatio()); + WriteBufferManager writeBufferManager = new WriteBufferManager((long) (getTotalMemoryPerSlot() * getWriteBufferRatio()), lruCache); + + rocksDBSharedObjects = new RocksDBSharedObjects(lruCache, writeBufferManager); + memoryManager.setStateBackendSharedObject(rocksDBSharedObjects); + } + } + + Cache blockCache = checkNotNull(rocksDBSharedObjects.getCache()); + dbOptions.setWriteBufferManager(checkNotNull(rocksDBSharedObjects.getWriteBufferManager())); + + createColumnOptions = stateName -> { + ColumnFamilyOptions columnOptions = getColumnOptions(); + TableFormatConfig tableFormatConfig = columnOptions.tableFormatConfig(); + Preconditions.checkArgument(tableFormatConfig instanceof BlockBasedTableConfig, + "We currently only support BlockBasedTableConfig When bounding total memory."); + BlockBasedTableConfig blockBasedTableConfig = (BlockBasedTableConfig) tableFormatConfig; + blockBasedTableConfig.setBlockCache(blockCache); + blockBasedTableConfig.setCacheIndexAndFilterBlocks(true); + blockBasedTableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true); + blockBasedTableConfig.setPinL0FilterAndIndexBlocksInCache(true); + columnOptions.setTableFormatConfig(blockBasedTableConfig); + return columnOptions; + }; + } else { + createColumnOptions = stateName -> getColumnOptions(); + } + ExecutionConfig executionConfig = env.getExecutionConfig(); StreamCompressionDecorator keyGroupCompressionDecorator = getCompressionDecorator(executionConfig); RocksDBKeyedStateBackendBuilder<K> builder = new RocksDBKeyedStateBackendBuilder<>( operatorIdentifier, env.getUserClassLoader(), instanceBasePath, - getDbOptions(), - stateName -> getColumnOptions(), + dbOptions, + createColumnOptions, kvStateRegistry, keySerializer, numberOfKeyGroups, @@ -705,6 +808,13 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu } /** + * Gets whether bounding memory is enabled for this state backend. + */ + public boolean isBoundedMemoryEnabled() { + return totalMemoryPerSlot > 0; + } + + /** * Enable compaction filter to cleanup state with TTL is enabled. * * <p>Note: User can still decide in state TTL configuration in state descriptor @@ -853,6 +963,34 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu setNumberOfTransferThreads(numberOfTransferingThreads); } + public void setTotalMemoryPerSlot(String totalMemoryPerSlotStr) { + long totalMemoryPerSlot = MemorySize.parseBytes(totalMemoryPerSlotStr); + Preconditions.checkArgument(totalMemoryPerSlot > 0, String.format("Illegal total memory per slot %s for RocksDBs.", totalMemoryPerSlot)); + this.totalMemoryPerSlot = totalMemoryPerSlot; + } + + public long getTotalMemoryPerSlot() { + return totalMemoryPerSlot; + } + + public void setWriteBufferRatio(double writeBufferRatio) { + Preconditions.checkArgument(writeBufferRatio > 0 && writeBufferRatio < 1.0, String.format("Illegal write buffer ratio %s for RocksDBs.", writeBufferRatio)); + this.writeBufferRatio = writeBufferRatio; + } + + public double getWriteBufferRatio() { + return writeBufferRatio; + } + + public void setHighPriPoolRatio(double highPriPoolRatio) { + Preconditions.checkArgument(highPriPoolRatio > 0 && highPriPoolRatio < 1.0, String.format("Illegal high priority pool ratio %s for RocksDBs.", highPriPoolRatio)); + this.highPriPoolRatio = highPriPoolRatio; + } + + public double getHighPriPoolRatio() { + return highPriPoolRatio; + } + // ------------------------------------------------------------------------ // utilities // ------------------------------------------------------------------------ diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendBoundedMemoryTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendBoundedMemoryTest.java new file mode 100644 index 0000000..c564eb4 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendBoundedMemoryTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.memory.MemoryManagerBuilder; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.Cache; +import org.rocksdb.WriteBufferManager; + +import java.io.IOException; +import java.util.Collections; + +import static org.junit.Assert.fail; + +/** + * Tests to verify memory bounded for rocksDB state backend. + */ +public class RocksDBStateBackendBoundedMemoryTest { + + private static final int MEMORY_SIZE = 64 * 1024 * 1024; // 64 MiBytes + + private MemoryManager memoryManager; + + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void setUp() { + this.memoryManager = MemoryManagerBuilder + .newBuilder() + .setMemorySize(MemoryType.OFF_HEAP, MEMORY_SIZE) + .build(); + } + + @After + public void shutDown() { + this.memoryManager.shutdown(); + } + + @Test + public void testSharedObjectsInitializeOnlyOnce() throws IOException { + DummyEnvironment env = new DummyEnvironment(); + env.setMemoryManager(memoryManager); + + Assert.assertNull(memoryManager.getStateBackendSharedObject()); + + Configuration configuration = new Configuration(); + configuration.setString(RocksDBOptions.BOUNDED_MEMORY_SIZE, "128MB"); + RocksDBStateBackend originalStateBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI()); + originalStateBackend.setDbStoragePath(tempFolder.newFolder().getAbsolutePath()); + + RocksDBStateBackend rocksDBStateBackend1 = originalStateBackend.configure(configuration, Thread.currentThread().getContextClassLoader()); + AbstractKeyedStateBackend keyedStateBackend1 = createKeyedStateBackend(rocksDBStateBackend1, env); + + Assert.assertTrue("The shared object should be a RocksDBSharedObject instance but actually not", + memoryManager.getStateBackendSharedObject() instanceof RocksDBSharedObjects); + RocksDBSharedObjects sharedObjects = (RocksDBSharedObjects) memoryManager.getStateBackendSharedObject(); + + Cache lruCache = sharedObjects.getCache(); + WriteBufferManager writeBufferManager = sharedObjects.getWriteBufferManager(); + + RocksDBStateBackend rocksDBStateBackend2 = originalStateBackend.configure(configuration, Thread.currentThread().getContextClassLoader()); + AbstractKeyedStateBackend keyedStateBackend2 = createKeyedStateBackend(rocksDBStateBackend2, env); + + // Another keyed state backend is created but only initialized once for cache and write buffer manager. + sharedObjects = (RocksDBSharedObjects) memoryManager.getStateBackendSharedObject(); + + try { + Assert.assertEquals(lruCache, sharedObjects.getCache()); + Assert.assertEquals(writeBufferManager, sharedObjects.getWriteBufferManager()); + } finally { + keyedStateBackend1.close(); + keyedStateBackend2.close(); + } + } + + @Test + public void testSharedObjectsNotClosedAfterKeyedStateBackendClosed() throws IOException { + DummyEnvironment env = new DummyEnvironment(); + env.setMemoryManager(memoryManager); + Assert.assertNull(memoryManager.getStateBackendSharedObject()); + + Configuration configuration = new Configuration(); + configuration.setString(RocksDBOptions.BOUNDED_MEMORY_SIZE, "128MB"); + RocksDBStateBackend originalStateBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI()); + originalStateBackend.setDbStoragePath(tempFolder.newFolder().getAbsolutePath()); + RocksDBStateBackend rocksDBStateBackend = originalStateBackend.configure(configuration, Thread.currentThread().getContextClassLoader()); + AbstractKeyedStateBackend keyedStateBackend = createKeyedStateBackend(rocksDBStateBackend, env); + + Assert.assertTrue("The shared object should be a RocksDBSharedObject instance but actually not", + memoryManager.getStateBackendSharedObject() instanceof RocksDBSharedObjects); + + RocksDBSharedObjects stateBackendSharedObject = (RocksDBSharedObjects) memoryManager.getStateBackendSharedObject(); + Assert.assertTrue(stateBackendSharedObject.getCache().isOwningHandle()); + Assert.assertTrue(stateBackendSharedObject.getWriteBufferManager().isOwningHandle()); + + keyedStateBackend.close(); + // even keyed state backend closed, cache and write buffer manager would not be disposed. + Assert.assertTrue(stateBackendSharedObject.getCache().isOwningHandle()); + Assert.assertTrue(stateBackendSharedObject.getWriteBufferManager().isOwningHandle()); + } + + @Test + public void testSharedObjectsInitializedConcurrently() throws IOException { + DummyEnvironment env = new DummyEnvironment(); + env.setMemoryManager(memoryManager); + + Assert.assertNull(memoryManager.getStateBackendSharedObject()); + + Configuration configuration = new Configuration(); + configuration.setString(RocksDBOptions.BOUNDED_MEMORY_SIZE, "128MB"); + RocksDBStateBackend originalStateBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI()); + originalStateBackend.setDbStoragePath(tempFolder.newFolder().getAbsolutePath()); + + int numThreads = 4; + Thread[] threads = new Thread[numThreads]; + RocksDBKeyedStateBackend[] keyedStateBackends = new RocksDBKeyedStateBackend[numThreads]; + for (int i = 0; i < numThreads; i++) { + int index = i; + threads[index] = new Thread(() -> { + RocksDBStateBackend rocksDBStateBackend = originalStateBackend.configure(configuration, Thread.currentThread().getContextClassLoader()); + try { + keyedStateBackends[index] = (RocksDBKeyedStateBackend) createKeyedStateBackend(rocksDBStateBackend, env); + } catch (IOException ignored) { + } + }); + threads[i].setDaemon(true); + } + + // launch concurrently + for (int i = 0; i < numThreads; i++) { + threads[i].start(); + } + + try { + // sync + for (int i = 0; i < numThreads; i++) { + threads[i].join(5000); + } + } catch (InterruptedException e) { + fail(e.getMessage()); + } + + WriteBufferManager writeBufferManager = null; + for (RocksDBKeyedStateBackend keyedStateBackend : keyedStateBackends) { + if (writeBufferManager == null) { + writeBufferManager = keyedStateBackend.getDbOptions().writeBufferManager(); + } else { + // different keyed state backends but share the same write buffer manager + Assert.assertEquals(writeBufferManager, keyedStateBackend.getDbOptions().writeBufferManager()); + } + keyedStateBackend.close(); + } + } + + private AbstractKeyedStateBackend createKeyedStateBackend(RocksDBStateBackend rocksDBStateBackend, Environment env) throws IOException { + return rocksDBStateBackend.createKeyedStateBackend( + env, + env.getJobID(), + "test_op", + IntSerializer.INSTANCE, + 1, + new KeyGroupRange(0, 0), + env.getTaskKvStateRegistry(), + TtlTimeProvider.DEFAULT, + new UnregisteredMetricsGroup(), + Collections.emptyList(), + new CloseableRegistry()); + + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index c00ffdb..9cd338f 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -632,6 +632,56 @@ public class RocksDBStateBackendConfigTest { } // ------------------------------------------------------------------------ + // RocksDB Memory Control + // ------------------------------------------------------------------------ + + @Test + public void testDefaultMemoryControlParameters() { + StateBackend storageBackend = new MemoryStateBackend(); + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(storageBackend); + assertEquals(RocksDBStateBackend.UNDEFINED_VALUE, rocksDbBackend.getTotalMemoryPerSlot()); + assertEquals(RocksDBStateBackend.UNDEFINED_VALUE, rocksDbBackend.getHighPriPoolRatio(), 0.0); + assertEquals(RocksDBStateBackend.UNDEFINED_VALUE, rocksDbBackend.getWriteBufferRatio(), 0.0); + + RocksDBStateBackend configure = rocksDbBackend.configure(new Configuration(), Thread.currentThread().getContextClassLoader()); + assertEquals(RocksDBStateBackend.UNDEFINED_VALUE, configure.getTotalMemoryPerSlot()); + assertEquals(RocksDBOptions.HIGH_PRI_POOL_RATIO.defaultValue(), configure.getHighPriPoolRatio(), 0.0); + assertEquals(RocksDBOptions.WRITE_BUFFER_RATIO.defaultValue(), configure.getWriteBufferRatio(), 0.0); + } + + @Test + public void testConfigureIllegalMemoryControlParameters() { + StateBackend storageBackend = new MemoryStateBackend(); + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(storageBackend); + + verifySetParameter(() -> rocksDbBackend.setTotalMemoryPerSlot("-1B")); + verifySetParameter(() -> rocksDbBackend.setHighPriPoolRatio(-0.1)); + verifySetParameter(() -> rocksDbBackend.setHighPriPoolRatio(1.1)); + verifySetParameter(() -> rocksDbBackend.setWriteBufferRatio(-0.1)); + verifySetParameter(() -> rocksDbBackend.setWriteBufferRatio(1.1)); + + rocksDbBackend.setTotalMemoryPerSlot("128MB"); + rocksDbBackend.setWriteBufferRatio(0.6); + rocksDbBackend.setHighPriPoolRatio(0.6); + try { + // sum of writeBufferRatio and highPriPoolRatio larger than 1.0 + rocksDbBackend.configure(new Configuration(), Thread.currentThread().getContextClassLoader()); + fail("No expected IllegalArgumentException."); + } catch (IllegalArgumentException expected) { + // expected exception + } + } + + private void verifySetParameter(Runnable setter) { + try { + setter.run(); + fail("No expected IllegalArgumentException."); + } catch (IllegalArgumentException expected) { + // expected exception + } + } + + // ------------------------------------------------------------------------ // Contained Non-partitioned State Backend // ------------------------------------------------------------------------