This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch release-2.0 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4bb815a753a04767b72d9161e182db6d39b6a30c Author: Rui Fan <[email protected]> AuthorDate: Fri Sep 26 14:27:10 2025 +0200 [FLINK-38415][checkpoint] Disable auto compaction to prevent Index OutOfBounds --- .../state/rocksdb/RocksDBKeyedStateBackend.java | 5 + .../state/rocksdb/restore/RestoredDBInstance.java | 9 +- .../RocksDBAutoCompactionIngestRestoreTest.java | 177 ++++++++++++++++++ .../restore/DistributeStateHandlerHelperTest.java | 200 +++++++++++++++++++++ 4 files changed, 388 insertions(+), 3 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java index edae8e5f391..34d1c019dd0 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java @@ -454,6 +454,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { sharedRocksKeyBuilder.setKeyAndKeyGroup(getCurrentKey(), getCurrentKeyGroupIndex()); } + @VisibleForTesting + LinkedHashMap<String, RocksDbKvStateInfo> getKvStateInformation() { + return kvStateInformation; + } + /** Should only be called by one thread, and only after all accesses to the DB happened. */ @Override public void dispose() { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RestoredDBInstance.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RestoredDBInstance.java index afbe27bce9c..9b5c4b74ad3 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RestoredDBInstance.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RestoredDBInstance.java @@ -100,10 +100,14 @@ public class RestoredDBInstance implements AutoCloseable { Long writeBufferManagerCapacity) throws Exception { + Function<String, ColumnFamilyOptions> tempDBCfFactory = + stateName -> + columnFamilyOptionsFactory.apply(stateName).setDisableAutoCompactions(true); + List<ColumnFamilyDescriptor> columnFamilyDescriptors = createColumnFamilyDescriptors( stateMetaInfoSnapshots, - columnFamilyOptionsFactory, + tempDBCfFactory, ttlCompactFiltersManager, writeBufferManagerCapacity, false); @@ -118,8 +122,7 @@ public class RestoredDBInstance implements AutoCloseable { restoreSourcePath.toString(), columnFamilyDescriptors, columnFamilyHandles, - RocksDBOperationUtils.createColumnFamilyOptions( - columnFamilyOptionsFactory, "default"), + RocksDBOperationUtils.createColumnFamilyOptions(tempDBCfFactory, "default"), dbOptions); return new RestoredDBInstance( diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBAutoCompactionIngestRestoreTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBAutoCompactionIngestRestoreTest.java new file mode 100644 index 00000000000..165d1d7df33 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBAutoCompactionIngestRestoreTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.rocksdb; + +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; +import org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.RocksDbKvStateInfo; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.streaming.api.operators.KeyedProcessOperator; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.util.Collector; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; + +import java.util.LinkedHashMap; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test to verify that auto-compaction is correctly configured during RocksDB incremental restore + * with ingest DB mode. This test ensures that production DBs maintain auto-compaction enabled while + * temporary DBs used during restore have auto-compaction disabled for performance. + */ +public class RocksDBAutoCompactionIngestRestoreTest { + + @TempDir private java.nio.file.Path tempFolder; + + private static final int MAX_PARALLELISM = 10; + + @Test + public void testAutoCompactionEnabledWithIngestDBRestore() throws Exception { + // Create two subtask snapshots and merge them to trigger the multi-state-handle scenario + // required for reproducing the ingest DB restore path + OperatorSubtaskState operatorSubtaskState = + AbstractStreamOperatorTestHarness.repackageState( + createSubtaskSnapshot(0), createSubtaskSnapshot(1)); + + OperatorSubtaskState initState = + AbstractStreamOperatorTestHarness.repartitionOperatorState( + operatorSubtaskState, MAX_PARALLELISM, 2, 1, 0); + + // Restore with ingest DB mode and verify auto-compaction + try (KeyedOneInputStreamOperatorTestHarness<String, Tuple2<String, String>, String> + harness = createTestHarness(new TestKeyedFunction(), MAX_PARALLELISM, 1, 0)) { + + EmbeddedRocksDBStateBackend stateBackend = createStateBackend(true); + harness.setStateBackend(stateBackend); + harness.setCheckpointStorage( + new FileSystemCheckpointStorage( + "file://" + tempFolder.resolve("checkpoint-restore").toAbsolutePath())); + + harness.initializeState(initState); + harness.open(); + + verifyAutoCompactionEnabled(harness); + } + } + + private OperatorSubtaskState createSubtaskSnapshot(int subtaskIndex) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<String, Tuple2<String, String>, String> + harness = + createTestHarness( + new TestKeyedFunction(), MAX_PARALLELISM, 2, subtaskIndex)) { + + harness.setStateBackend(createStateBackend(false)); + harness.setCheckpointStorage( + new FileSystemCheckpointStorage( + "file://" + + tempFolder + .resolve("checkpoint-subtask" + subtaskIndex) + .toAbsolutePath())); + harness.open(); + + // Create an empty snapshot - data content doesn't matter for this test + return harness.snapshot(0, 0); + } + } + + private void verifyAutoCompactionEnabled( + KeyedOneInputStreamOperatorTestHarness<String, Tuple2<String, String>, String> harness) + throws Exception { + KeyedStateBackend<String> backend = harness.getOperator().getKeyedStateBackend(); + assertThat(backend).isNotNull(); + + LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation = + ((RocksDBKeyedStateBackend<String>) backend).getKvStateInformation(); + + assertThat(kvStateInformation).as("kvStateInformation should not be empty").isNotEmpty(); + + for (RocksDbKvStateInfo stateInfo : kvStateInformation.values()) { + ColumnFamilyHandle handle = stateInfo.columnFamilyHandle; + assertThat(handle).isNotNull(); + + ColumnFamilyDescriptor descriptor = handle.getDescriptor(); + ColumnFamilyOptions options = descriptor.getOptions(); + + assertThat(options.disableAutoCompactions()) + .as( + "Production DB should have auto-compaction enabled for column family: " + + stateInfo.metaInfo.getName()) + .isFalse(); + } + } + + private KeyedOneInputStreamOperatorTestHarness<String, Tuple2<String, String>, String> + createTestHarness( + TestKeyedFunction keyedFunction, + int maxParallelism, + int parallelism, + int subtaskIndex) + throws Exception { + + return new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedProcessOperator<>(keyedFunction), + tuple2 -> tuple2.f0, + BasicTypeInfo.STRING_TYPE_INFO, + maxParallelism, + parallelism, + subtaskIndex); + } + + private EmbeddedRocksDBStateBackend createStateBackend(boolean useIngestDbRestoreMode) { + Configuration config = new Configuration(); + config.set(RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE, useIngestDbRestoreMode); + + EmbeddedRocksDBStateBackend stateBackend = new EmbeddedRocksDBStateBackend(true); + return stateBackend.configure(config, getClass().getClassLoader()); + } + + private static class TestKeyedFunction + extends KeyedProcessFunction<String, Tuple2<String, String>, String> { + private ValueState<String> state; + + @Override + public void open(OpenContext openContext) throws Exception { + super.open(openContext); + state = + getRuntimeContext() + .getState(new ValueStateDescriptor<>("test-state", String.class)); + } + + @Override + public void processElement(Tuple2<String, String> value, Context ctx, Collector<String> out) + throws Exception { + state.update(value.f1); + out.collect(value.f1); + } + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/restore/DistributeStateHandlerHelperTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/restore/DistributeStateHandlerHelperTest.java new file mode 100644 index 00000000000..93daf0455e4 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/restore/DistributeStateHandlerHelperTest.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.rocksdb.restore; + +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.DoubleSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.state.CompositeKeySerializationUtils; +import org.apache.flink.runtime.state.DirectoryStateHandle; +import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; +import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; +import org.apache.flink.types.Either; +import org.apache.flink.util.TestLogger; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.rocksdb.Checkpoint; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.ExportImportFilesMetaData; +import org.rocksdb.FlushOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Function; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test class for {@link DistributeStateHandlerHelper}. */ +public class DistributeStateHandlerHelperTest extends TestLogger { + + private static final int NUM_KEY_GROUPS = 128; + private static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, NUM_KEY_GROUPS - 1); + private static final int KEY_GROUP_PREFIX_BYTES = + CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(NUM_KEY_GROUPS); + private static final String CF_NAME = "test-column-family"; + + @TempDir private Path tempDir; + + /** Test whether sst files are exported when the key group all in range. */ + @Test + public void testAutoCompactionIsDisabled() throws Exception { + Path rocksDir = tempDir.resolve("rocksdb_dir"); + Path dbPath = rocksDir.resolve("db"); + Path chkDir = rocksDir.resolve("chk"); + Path exportDir = rocksDir.resolve("export"); + + Files.createDirectories(dbPath); + Files.createDirectories(exportDir); + + ArrayList<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(2); + + try (RocksDB db = openDB(dbPath.toString(), columnFamilyHandles)) { + ColumnFamilyHandle testCfHandler = columnFamilyHandles.get(1); + + // Create SST files and verify their creation + for (int i = 0; i < 4; i++) { + db.flush(new FlushOptions().setWaitForFlush(true), testCfHandler); + for (int j = 10; j < NUM_KEY_GROUPS / 2; j++) { + byte[] bytes = new byte[KEY_GROUP_PREFIX_BYTES]; + CompositeKeySerializationUtils.serializeKeyGroup(j, bytes); + db.delete(testCfHandler, bytes); + } + assertThat( + dbPath.toFile() + .listFiles( + (file, name) -> + name.toLowerCase().endsWith(".sst"))) + .hasSize(i); + } + + // Create checkpoint + try (Checkpoint checkpoint = Checkpoint.create(db)) { + checkpoint.createCheckpoint(chkDir.toString()); + } + } + + // Verify there are 4 sst files in level 0, compaction will be triggered once the DB is + // opened. + assertThat(chkDir.toFile().listFiles((file, name) -> name.toLowerCase().endsWith(".sst"))) + .hasSize(4); + + // Create IncrementalLocalKeyedStateHandle for testing + IncrementalLocalKeyedStateHandle stateHandle = createTestStateHandle(chkDir.toString()); + + try (DistributeStateHandlerHelper helper = + createDistributeStateHandlerHelper( + stateHandle, (name) -> new ColumnFamilyOptions())) { + + // This simulates the delay that allows background compaction to clean up SST files if + // auto compaction is enabled. + Thread.sleep(500); + Map<RegisteredStateMetaInfoBase.Key, List<ExportImportFilesMetaData>> + exportedColumnFamiliesOut = new HashMap<>(); + List<IncrementalLocalKeyedStateHandle> skipped = new ArrayList<>(); + + Either<KeyGroupRange, IncrementalLocalKeyedStateHandle> result = + helper.tryDistribute(exportDir, exportedColumnFamiliesOut); + assertThat(result.isLeft()).isTrue(); + assertThat(exportedColumnFamiliesOut).isNotEmpty(); + assertThat(skipped).isEmpty(); + } + } + + private RocksDB openDB(String path, ArrayList<ColumnFamilyHandle> columnFamilyHandles) + throws RocksDBException { + + List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(2); + columnFamilyDescriptors.add( + new ColumnFamilyDescriptor( + RocksDB.DEFAULT_COLUMN_FAMILY, + new ColumnFamilyOptions().setDisableAutoCompactions(true))); + columnFamilyDescriptors.add( + new ColumnFamilyDescriptor( + CF_NAME.getBytes(ConfigConstants.DEFAULT_CHARSET), + new ColumnFamilyOptions().setDisableAutoCompactions(true))); + + return RocksDB.open( + new DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true), + path, + columnFamilyDescriptors, + columnFamilyHandles); + } + + /** + * Creates a minimal IncrementalLocalKeyedStateHandle for testing. Uses empty metadata to focus + * on SST file distribution behavior. + */ + private IncrementalLocalKeyedStateHandle createTestStateHandle(String checkpointDir) { + return new IncrementalLocalKeyedStateHandle( + UUID.randomUUID(), + 1L, + new DirectoryStateHandle(Paths.get(checkpointDir), 0L), + KEY_GROUP_RANGE, + new ByteStreamStateHandle("meta", new byte[0]), + Collections.emptyList()); + } + + /** Creates a DistributeStateHandlerHelper with test-specific configuration. */ + private DistributeStateHandlerHelper createDistributeStateHandlerHelper( + IncrementalLocalKeyedStateHandle stateHandle, + Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory) + throws Exception { + TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE; + TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE; + + List<StateMetaInfoSnapshot> stateMetaInfoList = new ArrayList<>(); + stateMetaInfoList.add( + new RegisteredKeyValueStateBackendMetaInfo<>( + StateDescriptor.Type.VALUE, + CF_NAME, + namespaceSerializer, + stateSerializer) + .snapshot()); + return new DistributeStateHandlerHelper( + stateHandle, + stateMetaInfoList, + columnFamilyOptionsFactory, + new DBOptions().setCreateIfMissing(true), + null, + null, + KEY_GROUP_PREFIX_BYTES, + KEY_GROUP_RANGE, + "test-operator", + 0); + } +}
