This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4bb815a753a04767b72d9161e182db6d39b6a30c
Author: Rui Fan <[email protected]>
AuthorDate: Fri Sep 26 14:27:10 2025 +0200

    [FLINK-38415][checkpoint] Disable auto compaction to prevent Index 
OutOfBounds
---
 .../state/rocksdb/RocksDBKeyedStateBackend.java    |   5 +
 .../state/rocksdb/restore/RestoredDBInstance.java  |   9 +-
 .../RocksDBAutoCompactionIngestRestoreTest.java    | 177 ++++++++++++++++++
 .../restore/DistributeStateHandlerHelperTest.java  | 200 +++++++++++++++++++++
 4 files changed, 388 insertions(+), 3 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java
index edae8e5f391..34d1c019dd0 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java
@@ -454,6 +454,11 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
         sharedRocksKeyBuilder.setKeyAndKeyGroup(getCurrentKey(), 
getCurrentKeyGroupIndex());
     }
 
+    @VisibleForTesting
+    LinkedHashMap<String, RocksDbKvStateInfo> getKvStateInformation() {
+        return kvStateInformation;
+    }
+
     /** Should only be called by one thread, and only after all accesses to 
the DB happened. */
     @Override
     public void dispose() {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RestoredDBInstance.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RestoredDBInstance.java
index afbe27bce9c..9b5c4b74ad3 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RestoredDBInstance.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RestoredDBInstance.java
@@ -100,10 +100,14 @@ public class RestoredDBInstance implements AutoCloseable {
             Long writeBufferManagerCapacity)
             throws Exception {
 
+        Function<String, ColumnFamilyOptions> tempDBCfFactory =
+                stateName ->
+                        
columnFamilyOptionsFactory.apply(stateName).setDisableAutoCompactions(true);
+
         List<ColumnFamilyDescriptor> columnFamilyDescriptors =
                 createColumnFamilyDescriptors(
                         stateMetaInfoSnapshots,
-                        columnFamilyOptionsFactory,
+                        tempDBCfFactory,
                         ttlCompactFiltersManager,
                         writeBufferManagerCapacity,
                         false);
@@ -118,8 +122,7 @@ public class RestoredDBInstance implements AutoCloseable {
                         restoreSourcePath.toString(),
                         columnFamilyDescriptors,
                         columnFamilyHandles,
-                        RocksDBOperationUtils.createColumnFamilyOptions(
-                                columnFamilyOptionsFactory, "default"),
+                        
RocksDBOperationUtils.createColumnFamilyOptions(tempDBCfFactory, "default"),
                         dbOptions);
 
         return new RestoredDBInstance(
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBAutoCompactionIngestRestoreTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBAutoCompactionIngestRestoreTest.java
new file mode 100644
index 00000000000..165d1d7df33
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBAutoCompactionIngestRestoreTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.rocksdb;
+
+import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
+import 
org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.RocksDbKvStateInfo;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+
+import java.util.LinkedHashMap;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test to verify that auto-compaction is correctly configured during RocksDB 
incremental restore
+ * with ingest DB mode. This test ensures that production DBs maintain 
auto-compaction enabled while
+ * temporary DBs used during restore have auto-compaction disabled for 
performance.
+ */
+public class RocksDBAutoCompactionIngestRestoreTest {
+
+    @TempDir private java.nio.file.Path tempFolder;
+
+    private static final int MAX_PARALLELISM = 10;
+
+    @Test
+    public void testAutoCompactionEnabledWithIngestDBRestore() throws 
Exception {
+        // Create two subtask snapshots and merge them to trigger the 
multi-state-handle scenario
+        // required for reproducing the ingest DB restore path
+        OperatorSubtaskState operatorSubtaskState =
+                AbstractStreamOperatorTestHarness.repackageState(
+                        createSubtaskSnapshot(0), createSubtaskSnapshot(1));
+
+        OperatorSubtaskState initState =
+                AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                        operatorSubtaskState, MAX_PARALLELISM, 2, 1, 0);
+
+        // Restore with ingest DB mode and verify auto-compaction
+        try (KeyedOneInputStreamOperatorTestHarness<String, Tuple2<String, 
String>, String>
+                harness = createTestHarness(new TestKeyedFunction(), 
MAX_PARALLELISM, 1, 0)) {
+
+            EmbeddedRocksDBStateBackend stateBackend = 
createStateBackend(true);
+            harness.setStateBackend(stateBackend);
+            harness.setCheckpointStorage(
+                    new FileSystemCheckpointStorage(
+                            "file://" + 
tempFolder.resolve("checkpoint-restore").toAbsolutePath()));
+
+            harness.initializeState(initState);
+            harness.open();
+
+            verifyAutoCompactionEnabled(harness);
+        }
+    }
+
+    private OperatorSubtaskState createSubtaskSnapshot(int subtaskIndex) 
throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<String, Tuple2<String, 
String>, String>
+                harness =
+                        createTestHarness(
+                                new TestKeyedFunction(), MAX_PARALLELISM, 2, 
subtaskIndex)) {
+
+            harness.setStateBackend(createStateBackend(false));
+            harness.setCheckpointStorage(
+                    new FileSystemCheckpointStorage(
+                            "file://"
+                                    + tempFolder
+                                            .resolve("checkpoint-subtask" + 
subtaskIndex)
+                                            .toAbsolutePath()));
+            harness.open();
+
+            // Create an empty snapshot - data content doesn't matter for this 
test
+            return harness.snapshot(0, 0);
+        }
+    }
+
+    private void verifyAutoCompactionEnabled(
+            KeyedOneInputStreamOperatorTestHarness<String, Tuple2<String, 
String>, String> harness)
+            throws Exception {
+        KeyedStateBackend<String> backend = 
harness.getOperator().getKeyedStateBackend();
+        assertThat(backend).isNotNull();
+
+        LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation =
+                ((RocksDBKeyedStateBackend<String>) 
backend).getKvStateInformation();
+
+        assertThat(kvStateInformation).as("kvStateInformation should not be 
empty").isNotEmpty();
+
+        for (RocksDbKvStateInfo stateInfo : kvStateInformation.values()) {
+            ColumnFamilyHandle handle = stateInfo.columnFamilyHandle;
+            assertThat(handle).isNotNull();
+
+            ColumnFamilyDescriptor descriptor = handle.getDescriptor();
+            ColumnFamilyOptions options = descriptor.getOptions();
+
+            assertThat(options.disableAutoCompactions())
+                    .as(
+                            "Production DB should have auto-compaction enabled 
for column family: "
+                                    + stateInfo.metaInfo.getName())
+                    .isFalse();
+        }
+    }
+
+    private KeyedOneInputStreamOperatorTestHarness<String, Tuple2<String, 
String>, String>
+            createTestHarness(
+                    TestKeyedFunction keyedFunction,
+                    int maxParallelism,
+                    int parallelism,
+                    int subtaskIndex)
+                    throws Exception {
+
+        return new KeyedOneInputStreamOperatorTestHarness<>(
+                new KeyedProcessOperator<>(keyedFunction),
+                tuple2 -> tuple2.f0,
+                BasicTypeInfo.STRING_TYPE_INFO,
+                maxParallelism,
+                parallelism,
+                subtaskIndex);
+    }
+
+    private EmbeddedRocksDBStateBackend createStateBackend(boolean 
useIngestDbRestoreMode) {
+        Configuration config = new Configuration();
+        config.set(RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE, 
useIngestDbRestoreMode);
+
+        EmbeddedRocksDBStateBackend stateBackend = new 
EmbeddedRocksDBStateBackend(true);
+        return stateBackend.configure(config, getClass().getClassLoader());
+    }
+
+    private static class TestKeyedFunction
+            extends KeyedProcessFunction<String, Tuple2<String, String>, 
String> {
+        private ValueState<String> state;
+
+        @Override
+        public void open(OpenContext openContext) throws Exception {
+            super.open(openContext);
+            state =
+                    getRuntimeContext()
+                            .getState(new ValueStateDescriptor<>("test-state", 
String.class));
+        }
+
+        @Override
+        public void processElement(Tuple2<String, String> value, Context ctx, 
Collector<String> out)
+                throws Exception {
+            state.update(value.f1);
+            out.collect(value.f1);
+        }
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/restore/DistributeStateHandlerHelperTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/restore/DistributeStateHandlerHelperTest.java
new file mode 100644
index 00000000000..93daf0455e4
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/restore/DistributeStateHandlerHelperTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.rocksdb.restore;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
+import org.apache.flink.runtime.state.DirectoryStateHandle;
+import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.rocksdb.Checkpoint;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.ExportImportFilesMetaData;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test class for {@link DistributeStateHandlerHelper}. */
+public class DistributeStateHandlerHelperTest extends TestLogger {
+
+    private static final int NUM_KEY_GROUPS = 128;
+    private static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 
NUM_KEY_GROUPS - 1);
+    private static final int KEY_GROUP_PREFIX_BYTES =
+            
CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(NUM_KEY_GROUPS);
+    private static final String CF_NAME = "test-column-family";
+
+    @TempDir private Path tempDir;
+
+    /** Test whether sst files are exported when the key group all in range. */
+    @Test
+    public void testAutoCompactionIsDisabled() throws Exception {
+        Path rocksDir = tempDir.resolve("rocksdb_dir");
+        Path dbPath = rocksDir.resolve("db");
+        Path chkDir = rocksDir.resolve("chk");
+        Path exportDir = rocksDir.resolve("export");
+
+        Files.createDirectories(dbPath);
+        Files.createDirectories(exportDir);
+
+        ArrayList<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(2);
+
+        try (RocksDB db = openDB(dbPath.toString(), columnFamilyHandles)) {
+            ColumnFamilyHandle testCfHandler = columnFamilyHandles.get(1);
+
+            // Create SST files and verify their creation
+            for (int i = 0; i < 4; i++) {
+                db.flush(new FlushOptions().setWaitForFlush(true), 
testCfHandler);
+                for (int j = 10; j < NUM_KEY_GROUPS / 2; j++) {
+                    byte[] bytes = new byte[KEY_GROUP_PREFIX_BYTES];
+                    CompositeKeySerializationUtils.serializeKeyGroup(j, bytes);
+                    db.delete(testCfHandler, bytes);
+                }
+                assertThat(
+                                dbPath.toFile()
+                                        .listFiles(
+                                                (file, name) ->
+                                                        
name.toLowerCase().endsWith(".sst")))
+                        .hasSize(i);
+            }
+
+            // Create checkpoint
+            try (Checkpoint checkpoint = Checkpoint.create(db)) {
+                checkpoint.createCheckpoint(chkDir.toString());
+            }
+        }
+
+        // Verify there are 4 sst files in level 0, compaction will be 
triggered once the DB is
+        // opened.
+        assertThat(chkDir.toFile().listFiles((file, name) -> 
name.toLowerCase().endsWith(".sst")))
+                .hasSize(4);
+
+        // Create IncrementalLocalKeyedStateHandle for testing
+        IncrementalLocalKeyedStateHandle stateHandle = 
createTestStateHandle(chkDir.toString());
+
+        try (DistributeStateHandlerHelper helper =
+                createDistributeStateHandlerHelper(
+                        stateHandle, (name) -> new ColumnFamilyOptions())) {
+
+            // This simulates the delay that allows background compaction to 
clean up SST files if
+            // auto compaction is enabled.
+            Thread.sleep(500);
+            Map<RegisteredStateMetaInfoBase.Key, 
List<ExportImportFilesMetaData>>
+                    exportedColumnFamiliesOut = new HashMap<>();
+            List<IncrementalLocalKeyedStateHandle> skipped = new ArrayList<>();
+
+            Either<KeyGroupRange, IncrementalLocalKeyedStateHandle> result =
+                    helper.tryDistribute(exportDir, exportedColumnFamiliesOut);
+            assertThat(result.isLeft()).isTrue();
+            assertThat(exportedColumnFamiliesOut).isNotEmpty();
+            assertThat(skipped).isEmpty();
+        }
+    }
+
+    private RocksDB openDB(String path, ArrayList<ColumnFamilyHandle> 
columnFamilyHandles)
+            throws RocksDBException {
+
+        List<ColumnFamilyDescriptor> columnFamilyDescriptors = new 
ArrayList<>(2);
+        columnFamilyDescriptors.add(
+                new ColumnFamilyDescriptor(
+                        RocksDB.DEFAULT_COLUMN_FAMILY,
+                        new 
ColumnFamilyOptions().setDisableAutoCompactions(true)));
+        columnFamilyDescriptors.add(
+                new ColumnFamilyDescriptor(
+                        CF_NAME.getBytes(ConfigConstants.DEFAULT_CHARSET),
+                        new 
ColumnFamilyOptions().setDisableAutoCompactions(true)));
+
+        return RocksDB.open(
+                new 
DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true),
+                path,
+                columnFamilyDescriptors,
+                columnFamilyHandles);
+    }
+
+    /**
+     * Creates a minimal IncrementalLocalKeyedStateHandle for testing. Uses 
empty metadata to focus
+     * on SST file distribution behavior.
+     */
+    private IncrementalLocalKeyedStateHandle createTestStateHandle(String 
checkpointDir) {
+        return new IncrementalLocalKeyedStateHandle(
+                UUID.randomUUID(),
+                1L,
+                new DirectoryStateHandle(Paths.get(checkpointDir), 0L),
+                KEY_GROUP_RANGE,
+                new ByteStreamStateHandle("meta", new byte[0]),
+                Collections.emptyList());
+    }
+
+    /** Creates a DistributeStateHandlerHelper with test-specific 
configuration. */
+    private DistributeStateHandlerHelper createDistributeStateHandlerHelper(
+            IncrementalLocalKeyedStateHandle stateHandle,
+            Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory)
+            throws Exception {
+        TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
+        TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
+
+        List<StateMetaInfoSnapshot> stateMetaInfoList = new ArrayList<>();
+        stateMetaInfoList.add(
+                new RegisteredKeyValueStateBackendMetaInfo<>(
+                                StateDescriptor.Type.VALUE,
+                                CF_NAME,
+                                namespaceSerializer,
+                                stateSerializer)
+                        .snapshot());
+        return new DistributeStateHandlerHelper(
+                stateHandle,
+                stateMetaInfoList,
+                columnFamilyOptionsFactory,
+                new DBOptions().setCreateIfMissing(true),
+                null,
+                null,
+                KEY_GROUP_PREFIX_BYTES,
+                KEY_GROUP_RANGE,
+                "test-operator",
+                0);
+    }
+}

Reply via email to