This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 6a2586f7a [server] Add null safety for TimerTaskEntry removal (#1872)
6a2586f7a is described below
commit 6a2586f7aad8f2976d3c8f9ba463a528f54a9cee
Author: Rion Williams <[email protected]>
AuthorDate: Wed Dec 24 09:31:25 2025 -0600
[server] Add null safety for TimerTaskEntry removal (#1872)
---
.../fluss/server/utils/timer/TimerTaskEntry.java | 5 +-
.../server/utils/timer/TimerTaskEntryTest.java | 132 +++++++++++++++++++++
2 files changed, 134 insertions(+), 3 deletions(-)
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/utils/timer/TimerTaskEntry.java
b/fluss-server/src/main/java/org/apache/fluss/server/utils/timer/TimerTaskEntry.java
index 09a873520..18cfbaf22 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/utils/timer/TimerTaskEntry.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/utils/timer/TimerTaskEntry.java
@@ -57,15 +57,14 @@ public class TimerTaskEntry implements
Comparable<TimerTaskEntry> {
}
void remove() {
- TimerTaskList currentList = list;
+ TimerTaskList currentList;
// If remove is called when another thread is moving the entry from a
task
// entry list to another, this may fail to remove the entry due to the
change of value
// of list. Thus, we retry until the list becomes null. In a rare
case, this thread
// sees null and exits the loop, but the other thread insert the entry
to another list
// later.
- while (list != null) {
+ while ((currentList = list) != null) {
currentList.remove(this);
- currentList = list;
}
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/utils/timer/TimerTaskEntryTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/utils/timer/TimerTaskEntryTest.java
new file mode 100644
index 000000000..cfbdf2d90
--- /dev/null
+++
b/fluss-server/src/test/java/org/apache/fluss/server/utils/timer/TimerTaskEntryTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.utils.timer;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link org.apache.fluss.server.utils.timer.TimerTaskEntry}. */
+public class TimerTaskEntryTest {
+
+ @Test
+ void testRemoveEnsuresCurrentListNullSafety() throws InterruptedException {
+ // Create two lists to reproduce the values that we are working
+ // with being added/removed. We will oscillate between adding
+ // and removing these elements until we encounter a NPE
+ AtomicInteger sharedTaskCounter = new AtomicInteger(0);
+ TimerTaskList primaryList = new TimerTaskList(sharedTaskCounter);
+ TimerTaskList secondaryList = new TimerTaskList(sharedTaskCounter);
+
+ // Set up our initial task that will handle coordinating this
+ // reproduction behavior
+ TestTask task = new TestTask(0L);
+ TimerTaskEntry entry = new TimerTaskEntry(task, 10L);
+ primaryList.add(entry);
+
+ // Container for any NullPointerException caught during remove()
+ AtomicReference<NullPointerException> thrownException = new
AtomicReference<>();
+
+ // Latch to handle coordinating addition/removal threads
+ CountDownLatch latch = new CountDownLatch(1);
+
+ // Create a thread responsible for continually removing entries, which
+ // will be responsible for triggering the exception
+ Thread removalThread =
+ new Thread(
+ () -> {
+ try {
+ latch.await();
+ // Continually remove elements from the task
(forward-oscillation)
+ for (int i = 0; i < 10000; i++) {
+ entry.remove();
+ }
+ } catch (NullPointerException e) {
+ thrownException.set(e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+
+ // Create a separate thread for adding the entry while the removal
thread is
+ // still executing which results in the expected null reference
+ Thread additionThread =
+ new Thread(
+ () -> {
+ try {
+ // Wait for the initial removal to complete
+ latch.await();
+ // Add the entry to our separate list while
the removal thread is
+ // still verifying the condition (resulting in
our null list within
+ // the internal removal call, and our
exception)
+ for (int i = 0; i < 10000; i++) {
+ // Determine which list to add to the task
+ // (backwards-oscillation)
+ TimerTaskList currentList = entry.list;
+ if (currentList == null || currentList ==
primaryList) {
+ // If the entry is not in any list or
in the primary list,
+ // move it to the secondary list
+ secondaryList.add(entry);
+ } else if (currentList == secondaryList) {
+ // If the entry is in the secondary
list, move it to the
+ // primary list
+ primaryList.add(entry);
+ }
+ Thread.yield();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+
+ // Start both threads
+ removalThread.start();
+ additionThread.start();
+
+ // Release both threads to trigger our race condition
+ latch.countDown();
+
+ // Wait for threads to complete
+ removalThread.join();
+ additionThread.join();
+
+ // Attempt to remove the last entry (to ensure empty list)
+ entry.remove();
+
+ // Verify the list is empty after entry removal and ensure
+ // counter reflects the correct state
+ assertThat(entry.list).isNull();
+ assertThat(sharedTaskCounter.get()).isEqualTo(0);
+
+ // Assert that no exception was originated
+ assertThat(thrownException.get()).isNull();
+ }
+
+ private static class TestTask extends TimerTask {
+ public TestTask(long delayMs) {
+ super(delayMs);
+ }
+
+ @Override
+ public void run() {}
+ }
+}