This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 4be6d692303 [FLINK-36293][state] make sure register is closed before verification (#25373) 4be6d692303 is described below commit 4be6d6923036813c721ce303dbdedfd548422eef Author: Luke Chen <show...@gmail.com> AuthorDate: Fri Jan 17 11:34:05 2025 +0900 [FLINK-36293][state] make sure register is closed before verification (#25373) --- .../org/apache/flink/util/AbstractAutoCloseableRegistry.java | 4 ++-- .../flink/state/rocksdb/RocksDBWriteBatchWrapperTest.java | 10 ++++++++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractAutoCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/util/AbstractAutoCloseableRegistry.java index 78456a8b231..e256139412a 100644 --- a/flink-core/src/main/java/org/apache/flink/util/AbstractAutoCloseableRegistry.java +++ b/flink-core/src/main/java/org/apache/flink/util/AbstractAutoCloseableRegistry.java @@ -137,13 +137,13 @@ public abstract class AbstractAutoCloseableRegistry< /** * Does the actual registration of the closeable with the registry map. This should not do any - * long running or potentially blocking operations as is is executed under the registry's lock. + * long running or potentially blocking operations as it is executed under the registry's lock. */ protected abstract void doRegister(@Nonnull C closeable, @Nonnull Map<R, T> closeableMap); /** * Does the actual un-registration of the closeable from the registry map. This should not do - * any long running or potentially blocking operations as is is executed under the registry's + * any long running or potentially blocking operations as it is executed under the registry's * lock. */ protected abstract boolean doUnRegister(@Nonnull C closeable, @Nonnull Map<R, T> closeableMap); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBWriteBatchWrapperTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBWriteBatchWrapperTest.java index cfd5d3d8ca6..52143c5caca 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBWriteBatchWrapperTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBWriteBatchWrapperTest.java @@ -82,8 +82,13 @@ public class RocksDBWriteBatchWrapperTest { cancellationCheckInterval, batchSizeBytes)) { registry.registerCloseable(writeBatchWrapper.getCancelCloseable()); + // After the `writeStartedFuture` completes, the registry will start to close. writeStartedFuture.complete(null); + // In the infinite loop, we want to verify that the `put` method will check cancellation + // state on every `batch.count() % cancellationCheckInterval == 0`. We set + // cancellationCheckInterval to 1, So, we expect it will throw CancelTaskException + // no later than batch count becoming 2 in this test case. //noinspection InfiniteLoopStatement for (int i = 0; ; i++) { try { @@ -96,6 +101,11 @@ public class RocksDBWriteBatchWrapperTest { // make sure that cancellation is triggered earlier than periodic flush // but allow some delay of cancellation propagation assertThat(i).isLessThan(cancellationCheckInterval * 2); + if (i == 0) { + // make sure the registry is closed at least after the first run, so that we + // can verify the cancellation check is validating correctly. + cancellationRequestedFuture.join(); + } } } }