This is an automated email from the ASF dual-hosted git repository.

ChenSammi pushed a commit to branch HDDS-13513_Event_Notification_FeatureBranch
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to 
refs/heads/HDDS-13513_Event_Notification_FeatureBranch by this push:
     new e5f509d8801 HDDS-14007. EventNotification: Plumb in event listener 
plugins to run end-to-end (#10237)
e5f509d8801 is described below

commit e5f509d88013ee6ded28359863b14200de8d3501
Author: gardenia <[email protected]>
AuthorDate: Wed May 20 10:59:18 2026 +0100

    HDDS-14007. EventNotification: Plumb in event listener plugins to run 
end-to-end (#10237)
---
 .../ozone/om/eventlistener/OMEventListener.java    |  2 +-
 hadoop-ozone/integration-test/pom.xml              |  5 ++
 .../om/TestOMEventListenerPluginLifecycle.java     | 95 ++++++++++++++++++++++
 .../om/eventlistener/NoOpOMEventListener.java}     | 35 ++++++--
 .../OMEventListenerKafkaPublisher.java             |  2 +-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   | 20 +++++
 .../OMEventListenerPluginManager.java              |  6 +-
 .../TestOMEventListenerPluginManager.java          | 62 ++++++--------
 8 files changed, 181 insertions(+), 46 deletions(-)

diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListener.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListener.java
index 5f20301d458..0cc50230190 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListener.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListener.java
@@ -28,5 +28,5 @@ public interface OMEventListener {
 
   void start();
 
-  void shutdown();
+  void stop();
 }
diff --git a/hadoop-ozone/integration-test/pom.xml 
b/hadoop-ozone/integration-test/pom.xml
index 8393e496f29..1e9b6510375 100644
--- a/hadoop-ozone/integration-test/pom.xml
+++ b/hadoop-ozone/integration-test/pom.xml
@@ -450,6 +450,11 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.ozone</groupId>
+      <artifactId>ozone-manager-plugins</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.ozone</groupId>
       <artifactId>ozone-mini-cluster</artifactId>
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMEventListenerPluginLifecycle.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMEventListenerPluginLifecycle.java
new file mode 100644
index 00000000000..c3dc17d9e5a
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMEventListenerPluginLifecycle.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD;
+
+import java.util.List;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.om.eventlistener.NoOpOMEventListener;
+import org.apache.hadoop.ozone.om.eventlistener.OMEventListener;
+import org.apache.hadoop.ozone.om.eventlistener.OMEventListenerPluginManager;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+/**
+ * Integration test to verify OMEventListener plugin lifecycle within 
OzoneManager.
+ */
+@Timeout(300)
+public class TestOMEventListenerPluginLifecycle {
+
+  private static MiniOzoneCluster cluster = null;
+
+  @BeforeAll
+  public static void init() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setBoolean(OZONE_ACL_ENABLED, true);
+    conf.set(OZONE_ADMINISTRATORS, OZONE_ADMINISTRATORS_WILDCARD);
+
+    // Configure NoOpOMEventListener
+    conf.set("ozone.om.plugin.destination.foo", "enabled");
+    conf.set("ozone.om.plugin.destination.foo.classname", 
NoOpOMEventListener.class.getName());
+
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .build();
+    cluster.waitForClusterToBeReady();
+  }
+
+  @AfterAll
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testPluginLifecycleWithRestart() throws Exception {
+    OzoneManager om = cluster.getOzoneManager();
+    OMEventListenerPluginManager pluginManager = 
om.getEventListenerPluginManager();
+
+    List<OMEventListener> plugins = pluginManager.getLoaded();
+    Assertions.assertEquals(1, plugins.size(), "Should have 1 plugin loaded");
+
+    NoOpOMEventListener plugin = (NoOpOMEventListener) plugins.get(0);
+
+    Assertions.assertTrue(plugin.isInitialized(), "Plugin should be 
initialized");
+    Assertions.assertTrue(plugin.isStarted(), "Plugin should be started");
+    Assertions.assertFalse(plugin.isStopped(), "Plugin should not be stopped");
+
+    // Restart OM to verify restart lifecycle
+    cluster.restartOzoneManager();
+
+    // Get the new OM instance and plugin
+    om = cluster.getOzoneManager();
+    pluginManager = om.getEventListenerPluginManager();
+
+    plugins = pluginManager.getLoaded();
+    Assertions.assertEquals(1, plugins.size(), "Should have 1 plugin loaded 
after restart");
+    plugin = (NoOpOMEventListener) plugins.get(0);
+
+    Assertions.assertTrue(plugin.isInitialized(), "Plugin should be 
initialized after restart");
+    Assertions.assertTrue(plugin.isStarted(), "Plugin should be started after 
restart");
+    Assertions.assertFalse(plugin.isStopped(), "Plugin should not be stopped 
after restart");
+  }
+}
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListener.java
 
b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/NoOpOMEventListener.java
similarity index 57%
copy from 
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListener.java
copy to 
hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/NoOpOMEventListener.java
index 5f20301d458..82c7af149e8 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListener.java
+++ 
b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/NoOpOMEventListener.java
@@ -20,13 +20,38 @@
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 
 /**
- * Interface for event listener plugin implementations.
+ * A no-op implementation of {@link OMEventListener} for testing and as a 
template.
  */
-public interface OMEventListener {
+public class NoOpOMEventListener implements OMEventListener {
 
-  void initialize(OzoneConfiguration conf, OMEventListenerPluginContext 
pluginContext);
+  private boolean initialized = false;
+  private boolean started = false;
+  private boolean stopped = false;
 
-  void start();
+  @Override
+  public void initialize(OzoneConfiguration conf, OMEventListenerPluginContext 
pluginContext) {
+    initialized = true;
+  }
 
-  void shutdown();
+  @Override
+  public void start() {
+    started = true;
+  }
+
+  @Override
+  public void stop() {
+    stopped = true;
+  }
+
+  public boolean isInitialized() {
+    return initialized;
+  }
+
+  public boolean isStarted() {
+    return started;
+  }
+
+  public boolean isStopped() {
+    return stopped;
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java
 
b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java
index da0df189b2e..53724129b0d 100644
--- 
a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java
+++ 
b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java
@@ -88,7 +88,7 @@ public void start() {
   }
 
   @Override
-  public void shutdown() {
+  public void stop() {
     try {
       kafkaClient.shutdown();
     } catch (IOException ex) {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 7a9d66f86df..93a601f7d08 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -231,6 +231,7 @@
 import org.apache.hadoop.ozone.audit.OMAction;
 import org.apache.hadoop.ozone.audit.OMSystemAction;
 import org.apache.hadoop.ozone.common.Storage.StorageState;
+import org.apache.hadoop.ozone.om.eventlistener.OMEventListenerPluginManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
@@ -398,6 +399,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   private VolumeManager volumeManager;
   private BucketManager bucketManager;
   private KeyManager keyManager;
+  private OMEventListenerPluginManager eventListenerPluginManager;
   private PrefixManagerImpl prefixManager;
   private final UpgradeFinalizer<OzoneManager> upgradeFinalizer;
   private ExecutorService edekCacheLoader = null;
@@ -969,6 +971,8 @@ private void instantiateServices(boolean withNewSnapshot) 
throws IOException {
     prefixManager = new PrefixManagerImpl(this, metadataManager, true);
     keyManager = new KeyManagerImpl(this, scmClient, configuration,
         perfMetrics);
+    eventListenerPluginManager = new OMEventListenerPluginManager(this,
+        configuration);
     // If authorizer is not initialized or the authorizer is Native
     // re-initialize the authorizer, else for non-native authorizer
     // like ranger we can reuse previous value if it is initialized
@@ -1688,6 +1692,10 @@ public KeyManager getKeyManager() {
     return keyManager;
   }
 
+  public OMEventListenerPluginManager getEventListenerPluginManager() {
+    return eventListenerPluginManager;
+  }
+
   @VisibleForTesting
   public OMStorage getOmStorage() {
     return omStorage;
@@ -1865,6 +1873,7 @@ public void start() throws IOException {
     }
 
     keyManager.start(configuration);
+    eventListenerPluginManager.start(configuration);
 
     try {
       httpServer = new OzoneManagerHttpServer(configuration, this);
@@ -1913,6 +1922,7 @@ public void restart() throws IOException {
 
     metadataManager.start(configuration);
     keyManager.start(configuration);
+    eventListenerPluginManager.start(configuration);
     startSecretManagerIfNecessary();
 
     // Set metrics and start metrics back ground thread
@@ -2399,6 +2409,9 @@ public boolean stop() {
         isOmGrpcServerRunning = false;
       }
       keyManager.stop();
+      if (eventListenerPluginManager != null) {
+        eventListenerPluginManager.stop();
+      }
       stopSecretManager();
 
       if (scmTopologyClient != null) {
@@ -4065,6 +4078,7 @@ TermIndex installCheckpoint(String leaderId, Path 
checkpointLocation)
     return installCheckpoint(leaderId, checkpointLocation, checkpointTrxnInfo);
   }
 
+  @SuppressWarnings("methodlength")
   TermIndex installCheckpoint(String leaderId, Path checkpointLocation,
       TransactionInfo checkpointTrxnInfo) throws Exception {
     long startTime = Time.monotonicNow();
@@ -4073,6 +4087,9 @@ TermIndex installCheckpoint(String leaderId, Path 
checkpointLocation,
     try {
       // Stop Background services
       keyManager.stop();
+      if (eventListenerPluginManager != null) {
+        eventListenerPluginManager.stop();
+      }
       stopSecretManager();
       stopTrashEmptier();
       omSnapshotManager.invalidateCache();
@@ -4085,6 +4102,7 @@ TermIndex installCheckpoint(String leaderId, Path 
checkpointLocation,
           "installing the new checkpoint.");
       // Stop the checkpoint install process and restart the services.
       keyManager.start(configuration);
+      eventListenerPluginManager.start(configuration);
       startSecretManagerIfNecessary();
       startTrashEmptier(configuration);
       throw e;
@@ -4163,6 +4181,7 @@ TermIndex installCheckpoint(String leaderId, Path 
checkpointLocation,
       } else {
         // OM DB is not stopped. Start the services.
         keyManager.start(configuration);
+        eventListenerPluginManager.start(configuration);
         startSecretManagerIfNecessary();
         startTrashEmptier(configuration);
         omRatisServer.getOmStateMachine().unpause(lastAppliedIndex, term);
@@ -4388,6 +4407,7 @@ private void reloadOMState() throws IOException {
     // Restart required services
     metadataManager.start(configuration);
     keyManager.start(configuration);
+    eventListenerPluginManager.start(configuration);
     startSecretManagerIfNecessary();
     startTrashEmptier(configuration);
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java
index 1c5ff534335..79674dc20ff 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java
@@ -46,7 +46,7 @@ public List<OMEventListener> getLoaded() {
     return plugins;
   }
 
-  public void startAll() {
+  public void start(OzoneConfiguration conf) {
     for (OMEventListener plugin : plugins) {
       try {
         plugin.start();
@@ -57,10 +57,10 @@ public void startAll() {
     }
   }
 
-  public void shutdownAll() {
+  public void stop() {
     for (OMEventListener plugin : plugins) {
       try {
-        plugin.shutdown();
+        plugin.stop();
       } catch (Exception ex) {
         LOG.error("Failed to shut down event listener plugin {}",
             plugin.getClass().getName(), ex);
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerPluginManager.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerPluginManager.java
index 346f5c1c79f..29ca8d48883 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerPluginManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerPluginManager.java
@@ -50,51 +50,20 @@ static List<String> 
getLoadedPlugins(OMEventListenerPluginManager pluginManager)
     return loadedClasses;
   }
 
-  private static class BrokenFooPlugin {
+  private static class BrokenPlugin {
 
   }
 
   /**
-   * A dummy plugin implementation for testing.
+   * Dummy plugin implementation for testing.
    */
-  public static class FooPlugin implements OMEventListener {
-
-    private boolean initialized = false;
-    private boolean started = false;
-    private boolean shutdown = false;
-
-    @Override
-    public void initialize(OzoneConfiguration conf, 
OMEventListenerPluginContext pluginContext) {
-      initialized = true;
-    }
-
-    @Override
-    public void start() {
-      started = true;
-    }
-
-    @Override
-    public void shutdown() {
-      shutdown = true;
-    }
-
-    public boolean isInitialized() {
-      return initialized;
-    }
-
-    public boolean isStarted() {
-      return started;
-    }
-
-    public boolean isShutdown() {
-      return shutdown;
-    }
+  public static class FooPlugin extends NoOpOMEventListener {
   }
 
   /**
    * Another dummy plugin implementation for testing.
    */
-  public static class BarPlugin extends FooPlugin {
+  public static class BarPlugin extends NoOpOMEventListener {
   }
 
   @Test
@@ -154,7 +123,7 @@ public void testPluginClassDoesNotExist() {
   public void testPluginClassDoesNotImplementInterface() {
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.set("ozone.om.plugin.destination.foo", "enabled");
-    conf.set("ozone.om.plugin.destination.foo.classname", 
BrokenFooPlugin.class.getName());
+    conf.set("ozone.om.plugin.destination.foo.classname", 
BrokenPlugin.class.getName());
 
     OMEventListenerPluginManager pluginManager = new 
OMEventListenerPluginManager(ozoneManager, conf);
 
@@ -162,4 +131,25 @@ public void testPluginClassDoesNotImplementInterface() {
         Arrays.asList(),
         getLoadedPlugins(pluginManager));
   }
+
+  @Test
+  public void testLifecycle() {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.set("ozone.om.plugin.destination.foo", "enabled");
+    conf.set("ozone.om.plugin.destination.foo.classname", 
FooPlugin.class.getName());
+
+    OMEventListenerPluginManager pluginManager = new 
OMEventListenerPluginManager(ozoneManager, conf);
+    FooPlugin plugin = (FooPlugin) pluginManager.getLoaded().get(0);
+
+    Assertions.assertTrue(plugin.isInitialized());
+    Assertions.assertFalse(plugin.isStarted());
+    Assertions.assertFalse(plugin.isStopped());
+
+    pluginManager.start(conf);
+    Assertions.assertTrue(plugin.isStarted());
+    Assertions.assertFalse(plugin.isStopped());
+
+    pluginManager.stop();
+    Assertions.assertTrue(plugin.isStopped());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to