This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4be6d692303 [FLINK-36293][state] make sure register is closed before 
verification (#25373)
4be6d692303 is described below

commit 4be6d6923036813c721ce303dbdedfd548422eef
Author: Luke Chen <show...@gmail.com>
AuthorDate: Fri Jan 17 11:34:05 2025 +0900

    [FLINK-36293][state] make sure register is closed before verification 
(#25373)
---
 .../org/apache/flink/util/AbstractAutoCloseableRegistry.java   |  4 ++--
 .../flink/state/rocksdb/RocksDBWriteBatchWrapperTest.java      | 10 ++++++++++
 2 files changed, 12 insertions(+), 2 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/util/AbstractAutoCloseableRegistry.java
 
b/flink-core/src/main/java/org/apache/flink/util/AbstractAutoCloseableRegistry.java
index 78456a8b231..e256139412a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/util/AbstractAutoCloseableRegistry.java
+++ 
b/flink-core/src/main/java/org/apache/flink/util/AbstractAutoCloseableRegistry.java
@@ -137,13 +137,13 @@ public abstract class AbstractAutoCloseableRegistry<
 
     /**
      * Does the actual registration of the closeable with the registry map. 
This should not do any
-     * long running or potentially blocking operations as is is executed under 
the registry's lock.
+     * long running or potentially blocking operations as it is executed under 
the registry's lock.
      */
     protected abstract void doRegister(@Nonnull C closeable, @Nonnull Map<R, 
T> closeableMap);
 
     /**
      * Does the actual un-registration of the closeable from the registry map. 
This should not do
-     * any long running or potentially blocking operations as is is executed 
under the registry's
+     * any long running or potentially blocking operations as it is executed 
under the registry's
      * lock.
      */
     protected abstract boolean doUnRegister(@Nonnull C closeable, @Nonnull 
Map<R, T> closeableMap);
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBWriteBatchWrapperTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBWriteBatchWrapperTest.java
index cfd5d3d8ca6..52143c5caca 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBWriteBatchWrapperTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBWriteBatchWrapperTest.java
@@ -82,8 +82,13 @@ public class RocksDBWriteBatchWrapperTest {
                                 cancellationCheckInterval,
                                 batchSizeBytes)) {
             registry.registerCloseable(writeBatchWrapper.getCancelCloseable());
+            // After the `writeStartedFuture` completes, the registry will 
start to close.
             writeStartedFuture.complete(null);
 
+            // In the infinite loop, we want to verify that the `put` method 
will check cancellation
+            // state on every `batch.count() % cancellationCheckInterval == 
0`. We set
+            // cancellationCheckInterval to 1, So, we expect it will throw 
CancelTaskException
+            // no later than batch count becoming 2 in this test case.
             //noinspection InfiniteLoopStatement
             for (int i = 0; ; i++) {
                 try {
@@ -96,6 +101,11 @@ public class RocksDBWriteBatchWrapperTest {
                 // make sure that cancellation is triggered earlier than 
periodic flush
                 // but allow some delay of cancellation propagation
                 assertThat(i).isLessThan(cancellationCheckInterval * 2);
+                if (i == 0) {
+                    // make sure the registry is closed at least after the 
first run, so that we
+                    // can verify the cancellation check is validating 
correctly.
+                    cancellationRequestedFuture.join();
+                }
             }
         }
     }

Reply via email to