This is an automated email from the ASF dual-hosted git repository.
ChenSammi pushed a commit to branch HDDS-13513_Event_Notification_FeatureBranch
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to
refs/heads/HDDS-13513_Event_Notification_FeatureBranch by this push:
new bd884590893 HDDS-14006. EventNotification: create a plugin impl which
which publishes events to Kafka (#10096)
bd884590893 is described below
commit bd8845908931e11c16b018e51dabff8db8d4e45a
Author: gardenia <[email protected]>
AuthorDate: Mon May 11 09:22:00 2026 +0100
HDDS-14006. EventNotification: create a plugin impl which which publishes
events to Kafka (#10096)
---
.../OMEventListenerPluginContext.java | 12 ++
hadoop-ozone/dist/src/main/license/bin/LICENSE.txt | 2 +
hadoop-ozone/dist/src/main/license/jar-report.txt | 3 +
.../apache/hadoop/ozone/om/OMMetadataManager.java | 11 ++
hadoop-ozone/ozone-manager-plugins/pom.xml | 81 +++++++++
.../OMEventListenerKafkaPublisher.java | 186 +++++++++++++++++++++
.../eventlistener/OMEventListenerLedgerPoller.java | 148 ++++++++++++++++
.../OMEventListenerLedgerPollerSeekPosition.java | 63 +++++++
.../ozone/om/eventlistener/package-info.java} | 9 +-
.../TestOMEventListenerKafkaPublisher.java | 158 +++++++++++++++++
hadoop-ozone/ozone-manager/pom.xml | 5 +
.../hadoop/ozone/om/OmMetadataManagerImpl.java | 52 ++++++
.../OMEventListenerPluginContextImpl.java | 24 ++-
hadoop-ozone/pom.xml | 1 +
pom.xml | 12 ++
15 files changed, 757 insertions(+), 10 deletions(-)
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java
index 41b349754a7..f61b5efa612 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java
@@ -17,10 +17,22 @@
package org.apache.hadoop.ozone.om.eventlistener;
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo;
+
/**
* A narrow set of functionality we are ok with exposing to plugin
* implementations.
*/
public interface OMEventListenerPluginContext {
+ boolean isLeaderReady();
+
+ // TODO: should we allow plugins to pass in maxResults or just limit
+ // them to some predefined value for safety? e.g. 10K
+ List<OmCompletedRequestInfo> listCompletedRequestInfo(Long startKey, int
maxResults) throws IOException;
+
+ // XXX: this probably doesn't belong here
+ String getThreadNamePrefix();
}
diff --git a/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt
b/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt
index fda1e61820a..2e12ec5d1c7 100644
--- a/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt
+++ b/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt
@@ -262,6 +262,7 @@ CDDL 1.1 + GPLv2 with classpath exception
Apache License 2.0
=====================
+ at.yawk.lz4:lz4-java
ch.qos.reload4j:reload4j
com.amazonaws:aws-java-sdk-core
com.amazonaws:aws-java-sdk-kms
@@ -384,6 +385,7 @@ Apache License 2.0
org.apache.hadoop:hadoop-shaded-guava
org.apache.hadoop:hadoop-shaded-protobuf_3_25
org.apache.httpcomponents:httpcore
+ org.apache.kafka:kafka-clients
org.apache.kerby:kerb-admin
org.apache.kerby:kerb-client
org.apache.kerby:kerb-common
diff --git a/hadoop-ozone/dist/src/main/license/jar-report.txt
b/hadoop-ozone/dist/src/main/license/jar-report.txt
index 94827486c8e..e40eea85e85 100644
--- a/hadoop-ozone/dist/src/main/license/jar-report.txt
+++ b/hadoop-ozone/dist/src/main/license/jar-report.txt
@@ -159,6 +159,7 @@ share/ozone/lib/json-simple.jar
share/ozone/lib/jsp-api.jar
share/ozone/lib/jspecify.jar
share/ozone/lib/jsr311-api.jar
+share/ozone/lib/kafka-clients.jar
share/ozone/lib/kerb-core.jar
share/ozone/lib/kerb-crypto.jar
share/ozone/lib/kerb-util.jar
@@ -170,6 +171,7 @@ share/ozone/lib/kotlin-stdlib.jar
share/ozone/lib/listenablefuture-empty-to-avoid-conflict-with-guava.jar
share/ozone/lib/log4j-api.jar
share/ozone/lib/log4j-core.jar
+share/ozone/lib/lz4-java.jar
share/ozone/lib/metrics-core.jar
share/ozone/lib/netty-buffer.Final.jar
share/ozone/lib/netty-codec.Final.jar
@@ -227,6 +229,7 @@ share/ozone/lib/ozone-insight.jar
share/ozone/lib/ozone-interface-client.jar
share/ozone/lib/ozone-interface-storage.jar
share/ozone/lib/ozone-manager.jar
+share/ozone/lib/ozone-manager-plugins.jar
share/ozone/lib/ozone-multitenancy-ranger.jar
share/ozone/lib/ozone-reconcodegen.jar
share/ozone/lib/ozone-recon.jar
diff --git
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index abd63f09c0e..e956e7afb14 100644
---
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -345,6 +345,17 @@ ListSnapshotResponse listSnapshot(
List<OmVolumeArgs> listVolumes(String userName, String prefix,
String startKey, int maxKeys) throws IOException;
+ /**
+ * Returns a list of operation info objects.
+ *
+ * @param startKey the start key determines where to start listing
+ * from, this key is excluded from the result.
+ * @param maxResults the maximum number of results to return.
+ * @return a list of {@link OmCompletedRequestInfo}
+ * @throws IOException
+ */
+ List<OmCompletedRequestInfo> listCompletedRequestInfo(Long startKey, int
maxResults) throws IOException;
+
/**
* Returns the names of up to {@code count} open keys whose age is
* greater than or equal to {@code expireThreshold}.
diff --git a/hadoop-ozone/ozone-manager-plugins/pom.xml
b/hadoop-ozone/ozone-manager-plugins/pom.xml
new file mode 100644
index 00000000000..95abfcf4ef6
--- /dev/null
+++ b/hadoop-ozone/ozone-manager-plugins/pom.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.ozone</groupId>
+ <artifactId>ozone</artifactId>
+ <version>2.2.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>ozone-manager-plugins</artifactId>
+ <version>2.2.0-SNAPSHOT</version>
+ <packaging>jar</packaging>
+ <name>Apache Ozone Manager Plugins</name>
+ <properties>
+ <classpath.skip>false</classpath.skip>
+ <file.encoding>UTF-8</file.encoding>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ozone</groupId>
+ <artifactId>hdds-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ozone</groupId>
+ <artifactId>hdds-server-framework</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ozone</groupId>
+ <artifactId>ozone-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ozone</groupId>
+ <artifactId>ozone-interface-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <proc>none</proc>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java
b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java
new file mode 100644
index 00000000000..da0df189b2e
--- /dev/null
+++
b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.eventlistener;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is an implementation of OMEventListener which uses the
+ * OMEventListenerLedgerPoller as a building block to periodically poll/consume
+ * completed operations, serialize them to a S3 schema and produce them
+ * to a kafka topic.
+ */
+public class OMEventListenerKafkaPublisher implements OMEventListener {
+ public static final Logger LOG =
LoggerFactory.getLogger(OMEventListenerKafkaPublisher.class);
+
+ private static final String KAFKA_CONFIG_PREFIX = "ozone.om.plugin.kafka.";
+ private static final int COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE = 1;
+
+ private OMEventListenerLedgerPoller ledgerPoller;
+ private KafkaClientWrapper kafkaClient;
+ private OMEventListenerLedgerPollerSeekPosition seekPosition;
+
+ @Override
+ public void initialize(OzoneConfiguration conf, OMEventListenerPluginContext
pluginContext) {
+ Map<String, String> kafkaPropsMap =
conf.getPropsMatchPrefixAndTrimPrefix(KAFKA_CONFIG_PREFIX);
+ Properties kafkaProps = new Properties();
+ kafkaProps.putAll(kafkaPropsMap);
+
+ this.kafkaClient = new KafkaClientWrapper(kafkaProps);
+
+ // TODO: these constants should be read from config
+ long kafkaServiceInterval = 2 * 1000;
+ long kafkaServiceTimeout = 300 * 1000;
+
+ this.seekPosition = new OMEventListenerLedgerPollerSeekPosition();
+
+ LOG.info("Creating OMEventListenerLedgerPoller with serviceInterval={}," +
+ "serviceTimeout={}, seekPosition={}",
+ kafkaServiceInterval, kafkaServiceTimeout,
+ seekPosition);
+
+ this.ledgerPoller = new OMEventListenerLedgerPoller(
+ kafkaServiceInterval, TimeUnit.MILLISECONDS,
+ COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE,
+ kafkaServiceTimeout, pluginContext, conf,
+ seekPosition,
+ this::handleCompletedRequest);
+ }
+
+ @Override
+ public void start() {
+ try {
+ kafkaClient.initialize();
+ } catch (IOException ex) {
+ LOG.error("Failure initializing kafka client", ex);
+ return;
+ }
+
+ ledgerPoller.start();
+ }
+
+ @Override
+ public void shutdown() {
+ try {
+ kafkaClient.shutdown();
+ } catch (IOException ex) {
+ LOG.error("Failure shutting down kafka client", ex);
+ }
+
+ ledgerPoller.shutdown();
+ }
+
+ // callback called by OMEventListenerLedgerPoller
+ public void handleCompletedRequest(OmCompletedRequestInfo
completedRequestInfo) {
+ LOG.debug("Processing {}", completedRequestInfo);
+
+ // stub event until we implement a strategy to convert the events to
+ // a user facing schema (e.g. S3)
+ String event = String.format("{\"key\":\"%s/%s/%s\", \"type\":\"%s\"}",
+ completedRequestInfo.getVolumeName(),
+ completedRequestInfo.getBucketName(),
+ completedRequestInfo.getKeyName(),
+ String.valueOf(completedRequestInfo.getCmdType()));
+
+ LOG.debug("Sending {}", event);
+
+ try {
+ kafkaClient.send(event);
+ } catch (IOException ex) {
+ LOG.error("Failure to send event {}", event, ex);
+ return;
+ }
+
+ // we can update the seek position
+ seekPosition.set(String.valueOf(completedRequestInfo.getTrxLogIndex()));
+ }
+
+ static class KafkaClientWrapper {
+ public static final Logger LOG =
LoggerFactory.getLogger(KafkaClientWrapper.class);
+
+ private final String topic;
+ private final Properties kafkaProps;
+
+ private KafkaProducer<String, String> producer;
+
+ KafkaClientWrapper(Properties kafkaProps) {
+ this.topic = (String) kafkaProps.get("topic");
+ this.kafkaProps = kafkaProps;
+ }
+
+ public void initialize() throws IOException {
+ LOG.info("Initializing kafka client for topic {}", topic);
+ this.producer = new KafkaProducer<>(kafkaProps);
+
+ ensureTopicExists();
+ }
+
+ public void shutdown() throws IOException {
+ if (producer != null) {
+ producer.close();
+ }
+ }
+
+ public void send(String message) throws IOException {
+ if (producer != null) {
+ LOG.debug("Producing event {}", message);
+ ProducerRecord<String, String> producerRecord =
+ new ProducerRecord<>(topic, message);
+ try {
+ // TODO: Sequential blocking for every event ensures at-least-once
delivery
+ // but limits throughput. Consider batching async sends and calling
+ // producer.flush() before updating the seek position for
high-volume use cases.
+ producer.send(producerRecord).get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while sending message to Kafka",
e);
+ } catch (ExecutionException e) {
+ throw new IOException("Failed to send message to Kafka", e);
+ }
+ } else {
+ LOG.warn("Producing event {} [KAFKA DOWN]", message);
+ throw new IOException("Kafka producer is not initialized");
+ }
+ }
+
+ private void ensureTopicExists() {
+ try (AdminClient adminClient = AdminClient.create(kafkaProps)) {
+ LOG.info("Creating kafka topic: {}", this.topic);
+ NewTopic newTopic = new NewTopic(this.topic, 1, (short) 1);
+ // TODO: handle topic already exists failure
+ adminClient.createTopics(Collections.singleton(newTopic)).all().get();
+ adminClient.close();
+ } catch (Exception ex) {
+ LOG.error("Failed to create topic: {}", this.topic, ex);
+ }
+ }
+ }
+}
diff --git
a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPoller.java
b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPoller.java
new file mode 100644
index 00000000000..2ae3a2ff70c
--- /dev/null
+++
b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPoller.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.eventlistener;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a helper class which can be used by implementations of
+ * OMEventListener which uses a background service to read the latest
+ * completed operations and hand them to a callback method.
+ */
+public class OMEventListenerLedgerPoller extends BackgroundService {
+ public static final Logger LOG =
LoggerFactory.getLogger(OMEventListenerLedgerPoller.class);
+
+ private static final int MAX_RESULTS = 10_000;
+
+ private final AtomicBoolean suspended;
+ private final AtomicLong runCount;
+ private final AtomicLong successRunCount;
+ private final OMEventListenerPluginContext pluginContext;
+ private final OMEventListenerLedgerPollerSeekPosition seekPosition;
+ private final Consumer<OmCompletedRequestInfo> callback;
+
+ @SuppressWarnings("checkstyle:ParameterNumber")
+ public OMEventListenerLedgerPoller(long interval, TimeUnit unit,
+ int poolSize, long serviceTimeout,
+ OMEventListenerPluginContext pluginContext,
+ OzoneConfiguration configuration,
+ OMEventListenerLedgerPollerSeekPosition seekPosition,
+ Consumer<OmCompletedRequestInfo> callback) {
+
+ super("OMEventListenerLedgerPoller",
+ interval,
+ unit,
+ poolSize,
+ serviceTimeout, pluginContext.getThreadNamePrefix());
+
+ this.suspended = new AtomicBoolean(false);
+ this.runCount = new AtomicLong(0);
+ this.successRunCount = new AtomicLong(0);
+ this.pluginContext = pluginContext;
+ this.seekPosition = seekPosition;
+ this.callback = callback;
+ }
+
+ private boolean shouldRun() {
+ return pluginContext.isLeaderReady() && !suspended.get();
+ }
+
+ /**
+ * Suspend the service.
+ */
+ @VisibleForTesting
+ public void suspend() {
+ suspended.set(true);
+ }
+
+ /**
+ * Resume the service if suspended.
+ */
+ @VisibleForTesting
+ public void resume() {
+ suspended.set(false);
+ }
+
+ @Override
+ public BackgroundTaskQueue getTasks() {
+ BackgroundTaskQueue queue = new BackgroundTaskQueue();
+ queue.add(new
OMEventListenerLedgerPoller.CompletedRequestInfoConsumerTask());
+ return queue;
+ }
+
+ public AtomicLong getRunCount() {
+ return runCount;
+ }
+
+ private class CompletedRequestInfoConsumerTask implements BackgroundTask {
+
+ @Override
+ public int getPriority() {
+ return 0;
+ }
+
+ @Override
+ public BackgroundTaskResult call() {
+ if (shouldRun()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Running OMEventListenerLedgerPoller");
+ }
+ if (runCount.get() == 0) {
+ seekPosition.set(seekPosition.initSeekPosition());
+ }
+ getRunCount().incrementAndGet();
+
+ try {
+ String startKeyStr = seekPosition.get();
+ Long startKey = StringUtils.isNotBlank(startKeyStr) ?
Long.valueOf(startKeyStr) : null;
+ for (OmCompletedRequestInfo requestInfo :
pluginContext.listCompletedRequestInfo(
+ startKey, MAX_RESULTS)) {
+ callback.accept(requestInfo);
+ }
+ successRunCount.incrementAndGet();
+ } catch (IOException e) {
+ LOG.error("Error while running completed operation consumer " +
+ "background task. Will retry at next run.", e);
+ }
+ } else {
+ runCount.set(0);
+ }
+
+ // place holder by returning empty results of this call back.
+ return BackgroundTaskResult.EmptyTaskResult.newResult();
+ }
+ }
+
+ public long getSuccessfulRunCount() {
+ return successRunCount.get();
+ }
+}
diff --git
a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPollerSeekPosition.java
b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPollerSeekPosition.java
new file mode 100644
index 00000000000..bccbda1e2a7
--- /dev/null
+++
b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPollerSeekPosition.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.eventlistener;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a helper class to get/set the seek position used by the
+ * OMEventListenerLedgerPoller.
+ *
+ * XXX: the seek position should be persisted (and ideally distributed to
+ * all OMs) but at the moment it only lives in memory
+ */
+public class OMEventListenerLedgerPollerSeekPosition {
+ public static final Logger LOG =
LoggerFactory.getLogger(OMEventListenerLedgerPollerSeekPosition.class);
+
+ private final AtomicReference<String> seekPosition;
+
+ public OMEventListenerLedgerPollerSeekPosition() {
+ this.seekPosition = new AtomicReference(initSeekPosition());
+ }
+
+ // TODO: load this from persistent storage
+ public String initSeekPosition() {
+ return null;
+ }
+
+ public String get() {
+ return seekPosition.get();
+ }
+
+ public void set(String val) {
+ LOG.debug("Setting seek position {}", val);
+ // NOTE: this in-memory view of the seek position needs to be kept
+ // up to date because the OMEventListenerLedgerPoller has a
+ // reference to it
+ seekPosition.set(val);
+ }
+
+ @Override
+ public String toString() {
+ return "OMEventListenerLedgerPollerSeekPosition{" +
+ "seekPosition='" + seekPosition.get() + "'" +
+ '}';
+ }
+}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java
b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/package-info.java
similarity index 86%
copy from
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java
copy to
hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/package-info.java
index 41b349754a7..dbda5e337cc 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java
+++
b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/package-info.java
@@ -15,12 +15,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.ozone.om.eventlistener;
-
/**
- * A narrow set of functionality we are ok with exposing to plugin
- * implementations.
+ * This package contains classes for the OM Event Listener implementation.
*/
-public interface OMEventListenerPluginContext {
-
-}
+package org.apache.hadoop.ozone.om.eventlistener;
diff --git
a/hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerKafkaPublisher.java
b/hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerKafkaPublisher.java
new file mode 100644
index 00000000000..761f8f3c117
--- /dev/null
+++
b/hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerKafkaPublisher.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.eventlistener;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo;
+import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo.OperationArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.hadoop.util.Time;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockedConstruction;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Tests {@link OMEventListenerKafkaPublisher}.
+ */
+@ExtendWith(MockitoExtension.class)
+public class TestOMEventListenerKafkaPublisher {
+
+ private static final String VOLUME_NAME = "vol1";
+ private static final String BUCKET_NAME = "bucket1";
+
+ @Mock
+ private OMEventListenerPluginContext pluginContext;
+
+ // helper to create json key/val string for non exhaustive JSON
+ // attribute checking
+ private static String toJsonKeyVal(String key, String val) {
+ return new StringBuilder()
+ .append('\"')
+ .append(key)
+ .append('\"')
+ .append(':')
+ .append('\"')
+ .append(val)
+ .append('\"')
+ .toString();
+ }
+
+ private static OmCompletedRequestInfo buildCompletedRequestInfo(
+ long trxLogIndex, Type cmdType, String keyName, OperationArgs
opArgs) {
+
+ return new OmCompletedRequestInfo.Builder()
+ .setTrxLogIndex(trxLogIndex)
+ .setCmdType(cmdType)
+ .setVolumeName(VOLUME_NAME)
+ .setBucketName(BUCKET_NAME)
+ .setKeyName(keyName)
+ .setCreationTime(Time.now())
+ .setOpArgs(opArgs)
+ .build();
+ }
+
+ private List<String> captureEventsProducedByOperation(OmCompletedRequestInfo
op, int expectEvents)
+ throws IOException {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.set("ozone.om.plugin.kafka.topic", "abc");
+
+ List<String> events = new ArrayList<>();
+
+ OMEventListenerKafkaPublisher plugin = new OMEventListenerKafkaPublisher();
+ try (MockedConstruction<OMEventListenerKafkaPublisher.KafkaClientWrapper>
mockedKafkaClientWrapper =
+
mockConstruction(OMEventListenerKafkaPublisher.KafkaClientWrapper.class)) {
+
+ plugin.initialize(conf, pluginContext);
+ plugin.handleCompletedRequest(op);
+
+ OMEventListenerKafkaPublisher.KafkaClientWrapper mock =
mockedKafkaClientWrapper.constructed().get(0);
+ ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
+ verify(mock, times(expectEvents)).send(argument.capture());
+
+ events.addAll(argument.getAllValues());
+ }
+
+ return events;
+ }
+
+ @Test
+ public void testCreateKeyRequestProducesS3CreatedEvent() throws
InterruptedException, IOException {
+ OmCompletedRequestInfo createRequest = buildCompletedRequestInfo(1L,
Type.CreateKey, "some/key1",
+ new OperationArgs.NoArgs());
+
+ List<String> events = captureEventsProducedByOperation(createRequest, 1);
+ assertThat(events).hasSize(1);
+
+ assertThat(events.get(0))
+ .contains(toJsonKeyVal("key", "vol1/bucket1/some/key1"))
+ .contains(toJsonKeyVal("type", "CreateKey"));
+ }
+
+ @Test
+ public void testCreateFileRequestProducesS3CreatedEvent() throws
InterruptedException, IOException {
+ boolean recursive = false;
+ boolean overwrite = true;
+
+ OmCompletedRequestInfo createRequest = buildCompletedRequestInfo(2L,
Type.CreateFile, "some/key2",
+ new OperationArgs.CreateFileArgs(recursive, overwrite));
+
+ List<String> events = captureEventsProducedByOperation(createRequest, 1);
+ assertThat(events).hasSize(1);
+
+ assertThat(events.get(0))
+ .contains(toJsonKeyVal("key", "vol1/bucket1/some/key2"))
+ .contains(toJsonKeyVal("type", "CreateFile"));
+ }
+
+ @Test
+ public void testCreateDirectoryRequestProducesS3CreatedEvent() throws
InterruptedException, IOException {
+ OmCompletedRequestInfo createRequest = buildCompletedRequestInfo(3L,
Type.CreateDirectory, "some/key3",
+ new OperationArgs.NoArgs());
+
+ List<String> events = captureEventsProducedByOperation(createRequest, 1);
+ assertThat(events).hasSize(1);
+
+ assertThat(events.get(0))
+ .contains(toJsonKeyVal("key", "vol1/bucket1/some/key3"))
+ .contains(toJsonKeyVal("type", "CreateDirectory"));
+ }
+
+ @Test
+ public void testRenameRequestProducesRenameKeyEvent() throws
InterruptedException, IOException {
+ OmCompletedRequestInfo renameRequest = buildCompletedRequestInfo(4L,
Type.RenameKey, "some/key4",
+ new OperationArgs.RenameKeyArgs("some/key_RENAMED"));
+
+ List<String> events = captureEventsProducedByOperation(renameRequest, 1);
+ assertThat(events).hasSize(1);
+
+ assertThat(events.get(0))
+ .contains(toJsonKeyVal("key", "vol1/bucket1/some/key4"))
+ .contains(toJsonKeyVal("type", "RenameKey"));
+ }
+}
diff --git a/hadoop-ozone/ozone-manager/pom.xml
b/hadoop-ozone/ozone-manager/pom.xml
index 83dacbe4eb3..2ef5a410ff0 100644
--- a/hadoop-ozone/ozone-manager/pom.xml
+++ b/hadoop-ozone/ozone-manager/pom.xml
@@ -243,6 +243,11 @@
<artifactId>netty-tcnative-boringssl-static</artifactId>
<scope>runtime</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.ozone</groupId>
+ <artifactId>ozone-manager-plugins</artifactId>
+ <scope>runtime</scope>
+ </dependency>
<!-- Test dependencies -->
<dependency>
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index df3af67bc3b..a09cc3a8a2b 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -1340,6 +1340,58 @@ private List<OmVolumeArgs> listAllVolumes(String prefix,
String startKey,
return result;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<OmCompletedRequestInfo> listCompletedRequestInfo(final Long
startKey,
+ final int
maxResults)
+ throws IOException {
+ List<OmCompletedRequestInfo> results = new ArrayList<>();
+
+ Table.KeyValue<Long, OmCompletedRequestInfo> completedRequestInfoRow;
+ try (TableIterator<Long, ? extends Table.KeyValue<Long,
OmCompletedRequestInfo>>
+ tableIterator = getCompletedRequestInfoTable().iterator()) {
+
+ boolean skipFirst = false;
+ if (startKey != null) {
+ // TODO: what happens if the seek position is no longer
+ // available? Do we go to the end of the list
+ // or the first key > startKey
+ tableIterator.seek(startKey);
+ skipFirst = true;
+ }
+
+ while (tableIterator.hasNext() && results.size() < maxResults) {
+ completedRequestInfoRow = tableIterator.next();
+ // this is the first loop iteration after the seek so we
+ // need to skip the record we seeked to (if it is still
+ // present)
+ if (skipFirst) {
+ skipFirst = false;
+ // NOTE: I'm assuming that we need to conditionally do this
+ // only if it is equal to what we wanted to seek to (hence
+ if (!Objects.equals(startKey, completedRequestInfoRow.getKey())) {
+ // when we have a startKey we expect the first result
+ // to be that startKey. If it is not then we can infer that
+ // the startKey was already cleaned up and therefore we have
+ // missed some records somehow and this needs flagged to the
+ // caller.
+ // TODO: we should throw a custom exception here (instead of
+ // IOException) that needs to be handled appropriately by
+ // callers
+ throw new IOException(
+ "Missing rows - start key not found (startKey=" + startKey
+ + ", foundKey=" + completedRequestInfoRow.getKey() + ")");
+ }
+ } else {
+ results.add(completedRequestInfoRow.getValue());
+ }
+ }
+ }
+ return results;
+ }
+
private PersistedUserVolumeInfo getVolumesByUser(String userNameKey)
throws OMException {
try {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContextImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContextImpl.java
index 12573c62569..e0a805f0afc 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContextImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContextImpl.java
@@ -17,19 +17,37 @@
package org.apache.hadoop.ozone.om.eventlistener;
+import java.io.IOException;
+import java.util.List;
import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo;
/**
* A narrow set of functionality we are ok with exposing to plugin
* implementations.
*/
public final class OMEventListenerPluginContextImpl implements
OMEventListenerPluginContext {
- private final OzoneManager ozoneManager; // NOPMD: unused. will be used soon
+ private final OzoneManager ozoneManager;
public OMEventListenerPluginContextImpl(OzoneManager ozoneManager) {
this.ozoneManager = ozoneManager;
}
- // TODO: fill this out with capabilities we would like to expose to
- // plugin implementations.
+ @Override
+ public boolean isLeaderReady() {
+ return ozoneManager.isLeaderReady();
+ }
+
+ // TODO: should we allow plugins to pass in maxResults or just limit
+ // them to some predefined value for safety? e.g. 10K
+ @Override
+ public List<OmCompletedRequestInfo> listCompletedRequestInfo(Long startKey,
int maxResults) throws IOException {
+ return
ozoneManager.getMetadataManager().listCompletedRequestInfo(startKey,
maxResults);
+ }
+
+ // TODO: it feels like this doesn't belong here
+ @Override
+ public String getThreadNamePrefix() {
+ return ozoneManager.getThreadNamePrefix();
+ }
}
diff --git a/hadoop-ozone/pom.xml b/hadoop-ozone/pom.xml
index 417f05064ef..4b42e075417 100644
--- a/hadoop-ozone/pom.xml
+++ b/hadoop-ozone/pom.xml
@@ -46,6 +46,7 @@
<module>mini-cluster</module>
<module>multitenancy-ranger</module>
<module>ozone-manager</module>
+ <module>ozone-manager-plugins</module>
<module>ozonefs</module>
<module>ozonefs-common</module>
<module>recon</module>
diff --git a/pom.xml b/pom.xml
index 7db5ff9b318..098e99c3273 100644
--- a/pom.xml
+++ b/pom.xml
@@ -132,6 +132,7 @@
<jsp-api.version>2.1</jsp-api.version>
<jsr311-api.version>1.1.1</jsr311-api.version>
<junit5.version>5.14.2</junit5.version>
+ <kafka.version>3.9.2</kafka.version>
<kerby.version>1.0.1</kerby.version>
<kotlin.version>1.9.25</kotlin.version>
<license-maven-plugin.version>2.7.1</license-maven-plugin.version>
@@ -995,6 +996,11 @@
<artifactId>httpcore-nio</artifactId>
<version>${httpcore.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.kerby</groupId>
<artifactId>kerb-core</artifactId>
@@ -1272,6 +1278,11 @@
<version>${ozone.version}</version>
<type>test-jar</type>
</dependency>
+ <dependency>
+ <groupId>org.apache.ozone</groupId>
+ <artifactId>ozone-manager-plugins</artifactId>
+ <version>${ozone.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.ozone</groupId>
<artifactId>ozone-mini-cluster</artifactId>
@@ -2144,6 +2155,7 @@
-->
<ignoredNonTestScopedDependency>com.fasterxml.jackson.core:jackson-databind:jar</ignoredNonTestScopedDependency>
<ignoredNonTestScopedDependency>org.apache.commons:commons-compress:jar</ignoredNonTestScopedDependency>
+
<ignoredNonTestScopedDependency>org.apache.hadoop:hadoop-common:jar</ignoredNonTestScopedDependency>
<ignoredNonTestScopedDependency>org.apache.ozone:hdds-client:jar</ignoredNonTestScopedDependency>
<ignoredNonTestScopedDependency>org.apache.ozone:ozone-interface-client:jar</ignoredNonTestScopedDependency>
<ignoredNonTestScopedDependency>org.glassfish.jersey.core:jersey-common:jar</ignoredNonTestScopedDependency>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]