This is an automated email from the ASF dual-hosted git repository.
ChenSammi pushed a commit to branch HDDS-13513_Event_Notification_FeatureBranch
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to
refs/heads/HDDS-13513_Event_Notification_FeatureBranch by this push:
new e5f509d8801 HDDS-14007. EventNotification: Plumb in event listener
plugins to run end-to-end (#10237)
e5f509d8801 is described below
commit e5f509d88013ee6ded28359863b14200de8d3501
Author: gardenia <[email protected]>
AuthorDate: Wed May 20 10:59:18 2026 +0100
HDDS-14007. EventNotification: Plumb in event listener plugins to run
end-to-end (#10237)
---
.../ozone/om/eventlistener/OMEventListener.java | 2 +-
hadoop-ozone/integration-test/pom.xml | 5 ++
.../om/TestOMEventListenerPluginLifecycle.java | 95 ++++++++++++++++++++++
.../om/eventlistener/NoOpOMEventListener.java} | 35 ++++++--
.../OMEventListenerKafkaPublisher.java | 2 +-
.../org/apache/hadoop/ozone/om/OzoneManager.java | 20 +++++
.../OMEventListenerPluginManager.java | 6 +-
.../TestOMEventListenerPluginManager.java | 62 ++++++--------
8 files changed, 181 insertions(+), 46 deletions(-)
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListener.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListener.java
index 5f20301d458..0cc50230190 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListener.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListener.java
@@ -28,5 +28,5 @@ public interface OMEventListener {
void start();
- void shutdown();
+ void stop();
}
diff --git a/hadoop-ozone/integration-test/pom.xml
b/hadoop-ozone/integration-test/pom.xml
index 8393e496f29..1e9b6510375 100644
--- a/hadoop-ozone/integration-test/pom.xml
+++ b/hadoop-ozone/integration-test/pom.xml
@@ -450,6 +450,11 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.ozone</groupId>
+ <artifactId>ozone-manager-plugins</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.ozone</groupId>
<artifactId>ozone-mini-cluster</artifactId>
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMEventListenerPluginLifecycle.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMEventListenerPluginLifecycle.java
new file mode 100644
index 00000000000..c3dc17d9e5a
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMEventListenerPluginLifecycle.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD;
+
+import java.util.List;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.om.eventlistener.NoOpOMEventListener;
+import org.apache.hadoop.ozone.om.eventlistener.OMEventListener;
+import org.apache.hadoop.ozone.om.eventlistener.OMEventListenerPluginManager;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+/**
+ * Integration test to verify OMEventListener plugin lifecycle within
OzoneManager.
+ */
+@Timeout(300)
+public class TestOMEventListenerPluginLifecycle {
+
+ private static MiniOzoneCluster cluster = null;
+
+ @BeforeAll
+ public static void init() throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setBoolean(OZONE_ACL_ENABLED, true);
+ conf.set(OZONE_ADMINISTRATORS, OZONE_ADMINISTRATORS_WILDCARD);
+
+ // Configure NoOpOMEventListener
+ conf.set("ozone.om.plugin.destination.foo", "enabled");
+ conf.set("ozone.om.plugin.destination.foo.classname",
NoOpOMEventListener.class.getName());
+
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .build();
+ cluster.waitForClusterToBeReady();
+ }
+
+ @AfterAll
+ public static void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testPluginLifecycleWithRestart() throws Exception {
+ OzoneManager om = cluster.getOzoneManager();
+ OMEventListenerPluginManager pluginManager =
om.getEventListenerPluginManager();
+
+ List<OMEventListener> plugins = pluginManager.getLoaded();
+ Assertions.assertEquals(1, plugins.size(), "Should have 1 plugin loaded");
+
+ NoOpOMEventListener plugin = (NoOpOMEventListener) plugins.get(0);
+
+ Assertions.assertTrue(plugin.isInitialized(), "Plugin should be
initialized");
+ Assertions.assertTrue(plugin.isStarted(), "Plugin should be started");
+ Assertions.assertFalse(plugin.isStopped(), "Plugin should not be stopped");
+
+ // Restart OM to verify restart lifecycle
+ cluster.restartOzoneManager();
+
+ // Get the new OM instance and plugin
+ om = cluster.getOzoneManager();
+ pluginManager = om.getEventListenerPluginManager();
+
+ plugins = pluginManager.getLoaded();
+ Assertions.assertEquals(1, plugins.size(), "Should have 1 plugin loaded
after restart");
+ plugin = (NoOpOMEventListener) plugins.get(0);
+
+ Assertions.assertTrue(plugin.isInitialized(), "Plugin should be
initialized after restart");
+ Assertions.assertTrue(plugin.isStarted(), "Plugin should be started after
restart");
+ Assertions.assertFalse(plugin.isStopped(), "Plugin should not be stopped
after restart");
+ }
+}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListener.java
b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/NoOpOMEventListener.java
similarity index 57%
copy from
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListener.java
copy to
hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/NoOpOMEventListener.java
index 5f20301d458..82c7af149e8 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListener.java
+++
b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/NoOpOMEventListener.java
@@ -20,13 +20,38 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
/**
- * Interface for event listener plugin implementations.
+ * A no-op implementation of {@link OMEventListener} for testing and as a
template.
*/
-public interface OMEventListener {
+public class NoOpOMEventListener implements OMEventListener {
- void initialize(OzoneConfiguration conf, OMEventListenerPluginContext
pluginContext);
+ private boolean initialized = false;
+ private boolean started = false;
+ private boolean stopped = false;
- void start();
+ @Override
+ public void initialize(OzoneConfiguration conf, OMEventListenerPluginContext
pluginContext) {
+ initialized = true;
+ }
- void shutdown();
+ @Override
+ public void start() {
+ started = true;
+ }
+
+ @Override
+ public void stop() {
+ stopped = true;
+ }
+
+ public boolean isInitialized() {
+ return initialized;
+ }
+
+ public boolean isStarted() {
+ return started;
+ }
+
+ public boolean isStopped() {
+ return stopped;
+ }
}
diff --git
a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java
b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java
index da0df189b2e..53724129b0d 100644
---
a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java
+++
b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java
@@ -88,7 +88,7 @@ public void start() {
}
@Override
- public void shutdown() {
+ public void stop() {
try {
kafkaClient.shutdown();
} catch (IOException ex) {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 7a9d66f86df..93a601f7d08 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -231,6 +231,7 @@
import org.apache.hadoop.ozone.audit.OMAction;
import org.apache.hadoop.ozone.audit.OMSystemAction;
import org.apache.hadoop.ozone.common.Storage.StorageState;
+import org.apache.hadoop.ozone.om.eventlistener.OMEventListenerPluginManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
@@ -398,6 +399,7 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
private VolumeManager volumeManager;
private BucketManager bucketManager;
private KeyManager keyManager;
+ private OMEventListenerPluginManager eventListenerPluginManager;
private PrefixManagerImpl prefixManager;
private final UpgradeFinalizer<OzoneManager> upgradeFinalizer;
private ExecutorService edekCacheLoader = null;
@@ -969,6 +971,8 @@ private void instantiateServices(boolean withNewSnapshot)
throws IOException {
prefixManager = new PrefixManagerImpl(this, metadataManager, true);
keyManager = new KeyManagerImpl(this, scmClient, configuration,
perfMetrics);
+ eventListenerPluginManager = new OMEventListenerPluginManager(this,
+ configuration);
// If authorizer is not initialized or the authorizer is Native
// re-initialize the authorizer, else for non-native authorizer
// like ranger we can reuse previous value if it is initialized
@@ -1688,6 +1692,10 @@ public KeyManager getKeyManager() {
return keyManager;
}
+ public OMEventListenerPluginManager getEventListenerPluginManager() {
+ return eventListenerPluginManager;
+ }
+
@VisibleForTesting
public OMStorage getOmStorage() {
return omStorage;
@@ -1865,6 +1873,7 @@ public void start() throws IOException {
}
keyManager.start(configuration);
+ eventListenerPluginManager.start(configuration);
try {
httpServer = new OzoneManagerHttpServer(configuration, this);
@@ -1913,6 +1922,7 @@ public void restart() throws IOException {
metadataManager.start(configuration);
keyManager.start(configuration);
+ eventListenerPluginManager.start(configuration);
startSecretManagerIfNecessary();
// Set metrics and start metrics back ground thread
@@ -2399,6 +2409,9 @@ public boolean stop() {
isOmGrpcServerRunning = false;
}
keyManager.stop();
+ if (eventListenerPluginManager != null) {
+ eventListenerPluginManager.stop();
+ }
stopSecretManager();
if (scmTopologyClient != null) {
@@ -4065,6 +4078,7 @@ TermIndex installCheckpoint(String leaderId, Path
checkpointLocation)
return installCheckpoint(leaderId, checkpointLocation, checkpointTrxnInfo);
}
+ @SuppressWarnings("methodlength")
TermIndex installCheckpoint(String leaderId, Path checkpointLocation,
TransactionInfo checkpointTrxnInfo) throws Exception {
long startTime = Time.monotonicNow();
@@ -4073,6 +4087,9 @@ TermIndex installCheckpoint(String leaderId, Path
checkpointLocation,
try {
// Stop Background services
keyManager.stop();
+ if (eventListenerPluginManager != null) {
+ eventListenerPluginManager.stop();
+ }
stopSecretManager();
stopTrashEmptier();
omSnapshotManager.invalidateCache();
@@ -4085,6 +4102,7 @@ TermIndex installCheckpoint(String leaderId, Path
checkpointLocation,
"installing the new checkpoint.");
// Stop the checkpoint install process and restart the services.
keyManager.start(configuration);
+ eventListenerPluginManager.start(configuration);
startSecretManagerIfNecessary();
startTrashEmptier(configuration);
throw e;
@@ -4163,6 +4181,7 @@ TermIndex installCheckpoint(String leaderId, Path
checkpointLocation,
} else {
// OM DB is not stopped. Start the services.
keyManager.start(configuration);
+ eventListenerPluginManager.start(configuration);
startSecretManagerIfNecessary();
startTrashEmptier(configuration);
omRatisServer.getOmStateMachine().unpause(lastAppliedIndex, term);
@@ -4388,6 +4407,7 @@ private void reloadOMState() throws IOException {
// Restart required services
metadataManager.start(configuration);
keyManager.start(configuration);
+ eventListenerPluginManager.start(configuration);
startSecretManagerIfNecessary();
startTrashEmptier(configuration);
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java
index 1c5ff534335..79674dc20ff 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java
@@ -46,7 +46,7 @@ public List<OMEventListener> getLoaded() {
return plugins;
}
- public void startAll() {
+ public void start(OzoneConfiguration conf) {
for (OMEventListener plugin : plugins) {
try {
plugin.start();
@@ -57,10 +57,10 @@ public void startAll() {
}
}
- public void shutdownAll() {
+ public void stop() {
for (OMEventListener plugin : plugins) {
try {
- plugin.shutdown();
+ plugin.stop();
} catch (Exception ex) {
LOG.error("Failed to shut down event listener plugin {}",
plugin.getClass().getName(), ex);
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerPluginManager.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerPluginManager.java
index 346f5c1c79f..29ca8d48883 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerPluginManager.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerPluginManager.java
@@ -50,51 +50,20 @@ static List<String>
getLoadedPlugins(OMEventListenerPluginManager pluginManager)
return loadedClasses;
}
- private static class BrokenFooPlugin {
+ private static class BrokenPlugin {
}
/**
- * A dummy plugin implementation for testing.
+ * Dummy plugin implementation for testing.
*/
- public static class FooPlugin implements OMEventListener {
-
- private boolean initialized = false;
- private boolean started = false;
- private boolean shutdown = false;
-
- @Override
- public void initialize(OzoneConfiguration conf,
OMEventListenerPluginContext pluginContext) {
- initialized = true;
- }
-
- @Override
- public void start() {
- started = true;
- }
-
- @Override
- public void shutdown() {
- shutdown = true;
- }
-
- public boolean isInitialized() {
- return initialized;
- }
-
- public boolean isStarted() {
- return started;
- }
-
- public boolean isShutdown() {
- return shutdown;
- }
+ public static class FooPlugin extends NoOpOMEventListener {
}
/**
* Another dummy plugin implementation for testing.
*/
- public static class BarPlugin extends FooPlugin {
+ public static class BarPlugin extends NoOpOMEventListener {
}
@Test
@@ -154,7 +123,7 @@ public void testPluginClassDoesNotExist() {
public void testPluginClassDoesNotImplementInterface() {
OzoneConfiguration conf = new OzoneConfiguration();
conf.set("ozone.om.plugin.destination.foo", "enabled");
- conf.set("ozone.om.plugin.destination.foo.classname",
BrokenFooPlugin.class.getName());
+ conf.set("ozone.om.plugin.destination.foo.classname",
BrokenPlugin.class.getName());
OMEventListenerPluginManager pluginManager = new
OMEventListenerPluginManager(ozoneManager, conf);
@@ -162,4 +131,25 @@ public void testPluginClassDoesNotImplementInterface() {
Arrays.asList(),
getLoadedPlugins(pluginManager));
}
+
+ @Test
+ public void testLifecycle() {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.set("ozone.om.plugin.destination.foo", "enabled");
+ conf.set("ozone.om.plugin.destination.foo.classname",
FooPlugin.class.getName());
+
+ OMEventListenerPluginManager pluginManager = new
OMEventListenerPluginManager(ozoneManager, conf);
+ FooPlugin plugin = (FooPlugin) pluginManager.getLoaded().get(0);
+
+ Assertions.assertTrue(plugin.isInitialized());
+ Assertions.assertFalse(plugin.isStarted());
+ Assertions.assertFalse(plugin.isStopped());
+
+ pluginManager.start(conf);
+ Assertions.assertTrue(plugin.isStarted());
+ Assertions.assertFalse(plugin.isStopped());
+
+ pluginManager.stop();
+ Assertions.assertTrue(plugin.isStopped());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]