This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9175b88d0d5 MINOR: Move DeferredEventCollection out from 
CoordinatorRuntime (#21348)
9175b88d0d5 is described below

commit 9175b88d0d509ed4e4b62ef99b25966c9dad3688
Author: David Jacot <[email protected]>
AuthorDate: Fri Jan 23 17:16:28 2026 +0100

    MINOR: Move DeferredEventCollection out from CoordinatorRuntime (#21348)
    
    Extract DeferredEventCollection from being an inner class of
    CoordinatorRuntime into its own standalone file. This improves code
    organization and adds unit tests for the class.
    
    Reviewers: Sean Quah <[email protected]>, Lianet Magrans
     <[email protected]>
---
 .../common/runtime/CoordinatorRuntime.java         |  52 ----------
 .../common/runtime/DeferredEventCollection.java    |  76 ++++++++++++++
 .../runtime/DeferredEventCollectionTest.java       | 114 +++++++++++++++++++++
 3 files changed, 190 insertions(+), 52 deletions(-)

diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index 92afee2cc7e..605af89a662 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -1230,58 +1230,6 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         }
     }
 
-    /**
-     * A collection of {@link DeferredEvent}. When completed, completes all 
the events in the collection
-     * and logs any exceptions thrown.
-     */
-    static class DeferredEventCollection implements DeferredEvent {
-        /**
-         * The logger.
-         */
-        private final Logger log;
-
-        /**
-         * The list of events.
-         */
-        private final List<DeferredEvent> events = new ArrayList<>();
-
-        public DeferredEventCollection(Logger log) {
-            this.log = log;
-        }
-
-        @Override
-        public void complete(Throwable t) {
-            for (DeferredEvent event : events) {
-                try {
-                    event.complete(t);
-                } catch (Throwable e) {
-                    log.error("Completion of event {} failed due to {}.", 
event, e.getMessage(), e);
-                }
-            }
-        }
-
-        public boolean add(DeferredEvent event) {
-            return events.add(event);
-        }
-
-        public int size() {
-            return events.size();
-        }
-
-        @Override
-        public String toString() {
-            return "DeferredEventCollection(events=" + events + ")";
-        }
-
-        public static DeferredEventCollection of(Logger log, DeferredEvent... 
deferredEvents) {
-            DeferredEventCollection collection = new 
DeferredEventCollection(log);
-            for (DeferredEvent deferredEvent : deferredEvents) {
-                collection.add(deferredEvent);
-            }
-            return collection;
-        }
-    }
-
     /**
      * A coordinator write operation.
      *
diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/DeferredEventCollection.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/DeferredEventCollection.java
new file mode 100644
index 00000000000..883ea2cbcf7
--- /dev/null
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/DeferredEventCollection.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import org.apache.kafka.deferred.DeferredEvent;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A collection of {@link DeferredEvent}. When completed, completes all the 
events in the collection
+ * and logs any exceptions thrown.
+ */
+public class DeferredEventCollection implements DeferredEvent {
+    /**
+     * The logger.
+     */
+    private final Logger log;
+
+    /**
+     * The list of events.
+     */
+    private final List<DeferredEvent> events = new ArrayList<>();
+
+    public DeferredEventCollection(Logger log) {
+        this.log = log;
+    }
+
+    @Override
+    public void complete(Throwable t) {
+        for (DeferredEvent event : events) {
+            try {
+                event.complete(t);
+            } catch (Throwable e) {
+                log.error("Completion of event {} failed due to {}.", event, 
e.getMessage(), e);
+            }
+        }
+    }
+
+    public boolean add(DeferredEvent event) {
+        return events.add(event);
+    }
+
+    public int size() {
+        return events.size();
+    }
+
+    @Override
+    public String toString() {
+        return "DeferredEventCollection(events=" + events + ")";
+    }
+
+    public static DeferredEventCollection of(Logger log, DeferredEvent... 
deferredEvents) {
+        DeferredEventCollection collection = new DeferredEventCollection(log);
+        for (DeferredEvent deferredEvent : deferredEvents) {
+            collection.add(deferredEvent);
+        }
+        return collection;
+    }
+}
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/DeferredEventCollectionTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/DeferredEventCollectionTest.java
new file mode 100644
index 00000000000..821f048c8b6
--- /dev/null
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/DeferredEventCollectionTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.deferred.DeferredEvent;
+
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class DeferredEventCollectionTest {
+
+    private static final Logger LOG = new 
LogContext().logger(DeferredEventCollectionTest.class);
+
+    @Test
+    public void testAddAndSize() {
+        DeferredEventCollection collection = new DeferredEventCollection(LOG);
+        assertEquals(0, collection.size());
+
+        assertTrue(collection.add(t -> { }));
+        assertEquals(1, collection.size());
+
+        assertTrue(collection.add(t -> { }));
+        assertEquals(2, collection.size());
+    }
+
+    @Test
+    public void testCompleteCallsAllEvents() {
+        List<Throwable> completedWith = new ArrayList<>();
+
+        DeferredEventCollection collection = new DeferredEventCollection(LOG);
+        collection.add(completedWith::add);
+        collection.add(completedWith::add);
+        collection.add(completedWith::add);
+
+        collection.complete(null);
+
+        assertEquals(3, completedWith.size());
+        for (Throwable t : completedWith) {
+            assertEquals(null, t);
+        }
+    }
+
+    @Test
+    public void testCompleteWithException() {
+        List<Throwable> completedWith = new ArrayList<>();
+        RuntimeException exception = new RuntimeException("test exception");
+
+        DeferredEventCollection collection = new DeferredEventCollection(LOG);
+        collection.add(completedWith::add);
+        collection.add(completedWith::add);
+
+        collection.complete(exception);
+
+        assertEquals(2, completedWith.size());
+        for (Throwable t : completedWith) {
+            assertEquals(exception, t);
+        }
+    }
+
+    @Test
+    public void testCompleteContinuesOnEventFailure() {
+        List<Throwable> completedWith = new ArrayList<>();
+
+        DeferredEventCollection collection = new DeferredEventCollection(LOG);
+        collection.add(completedWith::add);
+        collection.add(t -> {
+            throw new RuntimeException("event failure");
+        });
+        collection.add(completedWith::add);
+
+        // Should not throw, and should complete all events
+        collection.complete(null);
+
+        // The first and third events should have been completed
+        assertEquals(2, completedWith.size());
+    }
+
+    @Test
+    public void testOfFactoryMethod() {
+        DeferredEvent event1 = t -> { };
+        DeferredEvent event2 = t -> { };
+
+        DeferredEventCollection collection = DeferredEventCollection.of(LOG, 
event1, event2);
+
+        assertEquals(2, collection.size());
+    }
+
+    @Test
+    public void testOfFactoryMethodEmpty() {
+        DeferredEventCollection collection = DeferredEventCollection.of(LOG);
+        assertEquals(0, collection.size());
+    }
+}

Reply via email to