This is an automated email from the ASF dual-hosted git repository.

ChenSammi pushed a commit to branch HDDS-13513_Event_Notification_FeatureBranch
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to 
refs/heads/HDDS-13513_Event_Notification_FeatureBranch by this push:
     new bd884590893 HDDS-14006. EventNotification: create a plugin impl which 
which publishes events to Kafka (#10096)
bd884590893 is described below

commit bd8845908931e11c16b018e51dabff8db8d4e45a
Author: gardenia <[email protected]>
AuthorDate: Mon May 11 09:22:00 2026 +0100

    HDDS-14006. EventNotification: create a plugin impl which which publishes 
events to Kafka (#10096)
---
 .../OMEventListenerPluginContext.java              |  12 ++
 hadoop-ozone/dist/src/main/license/bin/LICENSE.txt |   2 +
 hadoop-ozone/dist/src/main/license/jar-report.txt  |   3 +
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |  11 ++
 hadoop-ozone/ozone-manager-plugins/pom.xml         |  81 +++++++++
 .../OMEventListenerKafkaPublisher.java             | 186 +++++++++++++++++++++
 .../eventlistener/OMEventListenerLedgerPoller.java | 148 ++++++++++++++++
 .../OMEventListenerLedgerPollerSeekPosition.java   |  63 +++++++
 .../ozone/om/eventlistener/package-info.java}      |   9 +-
 .../TestOMEventListenerKafkaPublisher.java         | 158 +++++++++++++++++
 hadoop-ozone/ozone-manager/pom.xml                 |   5 +
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |  52 ++++++
 .../OMEventListenerPluginContextImpl.java          |  24 ++-
 hadoop-ozone/pom.xml                               |   1 +
 pom.xml                                            |  12 ++
 15 files changed, 757 insertions(+), 10 deletions(-)

diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java
index 41b349754a7..f61b5efa612 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java
@@ -17,10 +17,22 @@
 
 package org.apache.hadoop.ozone.om.eventlistener;
 
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo;
+
 /**
  * A narrow set of functionality we are ok with exposing to plugin
  * implementations.
  */
 public interface OMEventListenerPluginContext {
 
+  boolean isLeaderReady();
+
+  // TODO: should we allow plugins to pass in maxResults or just limit
+  // them to some predefined value for safety?  e.g. 10K
+  List<OmCompletedRequestInfo> listCompletedRequestInfo(Long startKey, int 
maxResults) throws IOException;
+
+  // XXX: this probably doesn't belong here
+  String getThreadNamePrefix();
 }
diff --git a/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt 
b/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt
index fda1e61820a..2e12ec5d1c7 100644
--- a/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt
+++ b/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt
@@ -262,6 +262,7 @@ CDDL 1.1 + GPLv2 with classpath exception
 Apache License 2.0
 =====================
 
+   at.yawk.lz4:lz4-java
    ch.qos.reload4j:reload4j
    com.amazonaws:aws-java-sdk-core
    com.amazonaws:aws-java-sdk-kms
@@ -384,6 +385,7 @@ Apache License 2.0
    org.apache.hadoop:hadoop-shaded-guava
    org.apache.hadoop:hadoop-shaded-protobuf_3_25
    org.apache.httpcomponents:httpcore
+   org.apache.kafka:kafka-clients
    org.apache.kerby:kerb-admin
    org.apache.kerby:kerb-client
    org.apache.kerby:kerb-common
diff --git a/hadoop-ozone/dist/src/main/license/jar-report.txt 
b/hadoop-ozone/dist/src/main/license/jar-report.txt
index 94827486c8e..e40eea85e85 100644
--- a/hadoop-ozone/dist/src/main/license/jar-report.txt
+++ b/hadoop-ozone/dist/src/main/license/jar-report.txt
@@ -159,6 +159,7 @@ share/ozone/lib/json-simple.jar
 share/ozone/lib/jsp-api.jar
 share/ozone/lib/jspecify.jar
 share/ozone/lib/jsr311-api.jar
+share/ozone/lib/kafka-clients.jar
 share/ozone/lib/kerb-core.jar
 share/ozone/lib/kerb-crypto.jar
 share/ozone/lib/kerb-util.jar
@@ -170,6 +171,7 @@ share/ozone/lib/kotlin-stdlib.jar
 share/ozone/lib/listenablefuture-empty-to-avoid-conflict-with-guava.jar
 share/ozone/lib/log4j-api.jar
 share/ozone/lib/log4j-core.jar
+share/ozone/lib/lz4-java.jar
 share/ozone/lib/metrics-core.jar
 share/ozone/lib/netty-buffer.Final.jar
 share/ozone/lib/netty-codec.Final.jar
@@ -227,6 +229,7 @@ share/ozone/lib/ozone-insight.jar
 share/ozone/lib/ozone-interface-client.jar
 share/ozone/lib/ozone-interface-storage.jar
 share/ozone/lib/ozone-manager.jar
+share/ozone/lib/ozone-manager-plugins.jar
 share/ozone/lib/ozone-multitenancy-ranger.jar
 share/ozone/lib/ozone-reconcodegen.jar
 share/ozone/lib/ozone-recon.jar
diff --git 
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
 
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index abd63f09c0e..e956e7afb14 100644
--- 
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++ 
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -345,6 +345,17 @@ ListSnapshotResponse listSnapshot(
   List<OmVolumeArgs> listVolumes(String userName, String prefix,
       String startKey, int maxKeys) throws IOException;
 
+  /**
+   * Returns a list of operation info objects.
+   *
+   * @param startKey the start key determines where to start listing
+   * from, this key is excluded from the result.
+   * @param maxResults the maximum number of results to return.
+   * @return a list of {@link OmCompletedRequestInfo}
+   * @throws IOException
+   */
+  List<OmCompletedRequestInfo> listCompletedRequestInfo(Long startKey, int 
maxResults) throws IOException;
+
   /**
    * Returns the names of up to {@code count} open keys whose age is
    * greater than or equal to {@code expireThreshold}.
diff --git a/hadoop-ozone/ozone-manager-plugins/pom.xml 
b/hadoop-ozone/ozone-manager-plugins/pom.xml
new file mode 100644
index 00000000000..95abfcf4ef6
--- /dev/null
+++ b/hadoop-ozone/ozone-manager-plugins/pom.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.ozone</groupId>
+    <artifactId>ozone</artifactId>
+    <version>2.2.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>ozone-manager-plugins</artifactId>
+  <version>2.2.0-SNAPSHOT</version>
+  <packaging>jar</packaging>
+  <name>Apache Ozone Manager Plugins</name>
+  <properties>
+    <classpath.skip>false</classpath.skip>
+    <file.encoding>UTF-8</file.encoding>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.ozone</groupId>
+      <artifactId>hdds-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.ozone</groupId>
+      <artifactId>hdds-server-framework</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.ozone</groupId>
+      <artifactId>ozone-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.ozone</groupId>
+      <artifactId>ozone-interface-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <proc>none</proc>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git 
a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java
 
b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java
new file mode 100644
index 00000000000..da0df189b2e
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.eventlistener;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is an implementation of OMEventListener which uses the
+ * OMEventListenerLedgerPoller as a building block to periodically poll/consume
+ * completed operations, serialize them to a S3 schema and produce them
+ * to a kafka topic.
+ */
+public class OMEventListenerKafkaPublisher implements OMEventListener {
+  public static final Logger LOG = 
LoggerFactory.getLogger(OMEventListenerKafkaPublisher.class);
+
+  private static final String KAFKA_CONFIG_PREFIX = "ozone.om.plugin.kafka.";
+  private static final int COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE = 1;
+
+  private OMEventListenerLedgerPoller ledgerPoller;
+  private KafkaClientWrapper kafkaClient;
+  private OMEventListenerLedgerPollerSeekPosition seekPosition;
+
+  @Override
+  public void initialize(OzoneConfiguration conf, OMEventListenerPluginContext 
pluginContext) {
+    Map<String, String> kafkaPropsMap = 
conf.getPropsMatchPrefixAndTrimPrefix(KAFKA_CONFIG_PREFIX);
+    Properties kafkaProps = new Properties();
+    kafkaProps.putAll(kafkaPropsMap);
+
+    this.kafkaClient = new KafkaClientWrapper(kafkaProps);
+
+    // TODO: these constants should be read from config
+    long kafkaServiceInterval = 2 * 1000;
+    long kafkaServiceTimeout = 300 * 1000;
+
+    this.seekPosition = new OMEventListenerLedgerPollerSeekPosition();
+
+    LOG.info("Creating OMEventListenerLedgerPoller with serviceInterval={}," +
+        "serviceTimeout={}, seekPosition={}",
+        kafkaServiceInterval, kafkaServiceTimeout,
+        seekPosition);
+
+    this.ledgerPoller = new OMEventListenerLedgerPoller(
+        kafkaServiceInterval, TimeUnit.MILLISECONDS,
+        COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE,
+        kafkaServiceTimeout, pluginContext, conf,
+        seekPosition,
+        this::handleCompletedRequest);
+  }
+
+  @Override
+  public void start() {
+    try {
+      kafkaClient.initialize();
+    } catch (IOException ex) {
+      LOG.error("Failure initializing kafka client", ex);
+      return;
+    }
+
+    ledgerPoller.start();
+  }
+
+  @Override
+  public void shutdown() {
+    try {
+      kafkaClient.shutdown();
+    } catch (IOException ex) {
+      LOG.error("Failure shutting down kafka client", ex);
+    }
+
+    ledgerPoller.shutdown();
+  }
+
+  // callback called by OMEventListenerLedgerPoller
+  public void handleCompletedRequest(OmCompletedRequestInfo 
completedRequestInfo) {
+    LOG.debug("Processing {}", completedRequestInfo);
+
+    // stub event until we implement a strategy to convert the events to
+    // a user facing schema (e.g. S3)
+    String event = String.format("{\"key\":\"%s/%s/%s\", \"type\":\"%s\"}",
+        completedRequestInfo.getVolumeName(),
+        completedRequestInfo.getBucketName(),
+        completedRequestInfo.getKeyName(),
+        String.valueOf(completedRequestInfo.getCmdType()));
+
+    LOG.debug("Sending {}", event);
+
+    try {
+      kafkaClient.send(event);
+    } catch (IOException ex) {
+      LOG.error("Failure to send event {}", event, ex);
+      return;
+    }
+
+    // we can update the seek position
+    seekPosition.set(String.valueOf(completedRequestInfo.getTrxLogIndex()));
+  }
+
+  static class KafkaClientWrapper {
+    public static final Logger LOG = 
LoggerFactory.getLogger(KafkaClientWrapper.class);
+
+    private final String topic;
+    private final Properties kafkaProps;
+
+    private KafkaProducer<String, String> producer;
+
+    KafkaClientWrapper(Properties kafkaProps) {
+      this.topic = (String) kafkaProps.get("topic");
+      this.kafkaProps = kafkaProps;
+    }
+
+    public void initialize() throws IOException {
+      LOG.info("Initializing kafka client for topic {}", topic);
+      this.producer = new KafkaProducer<>(kafkaProps);
+
+      ensureTopicExists();
+    }
+
+    public void shutdown() throws IOException {
+      if (producer != null) {
+        producer.close();
+      }
+    }
+
+    public void send(String message) throws IOException {
+      if (producer != null) {
+        LOG.debug("Producing event {}", message);
+        ProducerRecord<String, String> producerRecord =
+            new ProducerRecord<>(topic, message);
+        try {
+          // TODO: Sequential blocking for every event ensures at-least-once 
delivery
+          // but limits throughput. Consider batching async sends and calling
+          // producer.flush() before updating the seek position for 
high-volume use cases.
+          producer.send(producerRecord).get();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IOException("Interrupted while sending message to Kafka", 
e);
+        } catch (ExecutionException e) {
+          throw new IOException("Failed to send message to Kafka", e);
+        }
+      } else {
+        LOG.warn("Producing event {} [KAFKA DOWN]", message);
+        throw new IOException("Kafka producer is not initialized");
+      }
+    }
+
+    private void ensureTopicExists() {
+      try (AdminClient adminClient = AdminClient.create(kafkaProps)) {
+        LOG.info("Creating kafka topic: {}", this.topic);
+        NewTopic newTopic = new NewTopic(this.topic, 1, (short) 1);
+        // TODO: handle topic already exists failure
+        adminClient.createTopics(Collections.singleton(newTopic)).all().get();
+        adminClient.close();
+      } catch (Exception ex) {
+        LOG.error("Failed to create topic: {}", this.topic, ex);
+      }
+    }
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPoller.java
 
b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPoller.java
new file mode 100644
index 00000000000..2ae3a2ff70c
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPoller.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.eventlistener;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a helper class which can be used by implementations of
+ * OMEventListener which uses a background service to read the latest
+ * completed operations and hand them to a callback method.
+ */
+public class OMEventListenerLedgerPoller extends BackgroundService {
+  public static final Logger LOG = 
LoggerFactory.getLogger(OMEventListenerLedgerPoller.class);
+
+  private static final int MAX_RESULTS = 10_000;
+
+  private final AtomicBoolean suspended;
+  private final AtomicLong runCount;
+  private final AtomicLong successRunCount;
+  private final OMEventListenerPluginContext pluginContext;
+  private final OMEventListenerLedgerPollerSeekPosition seekPosition;
+  private final Consumer<OmCompletedRequestInfo> callback;
+
+  @SuppressWarnings("checkstyle:ParameterNumber")
+  public OMEventListenerLedgerPoller(long interval, TimeUnit unit,
+      int poolSize, long serviceTimeout,
+      OMEventListenerPluginContext pluginContext,
+      OzoneConfiguration configuration,
+      OMEventListenerLedgerPollerSeekPosition seekPosition,
+      Consumer<OmCompletedRequestInfo> callback) {
+
+    super("OMEventListenerLedgerPoller",
+          interval,
+          unit,
+          poolSize,
+          serviceTimeout, pluginContext.getThreadNamePrefix());
+
+    this.suspended = new AtomicBoolean(false);
+    this.runCount = new AtomicLong(0);
+    this.successRunCount = new AtomicLong(0);
+    this.pluginContext = pluginContext;
+    this.seekPosition = seekPosition;
+    this.callback = callback;
+  }
+
+  private boolean shouldRun() {
+    return pluginContext.isLeaderReady() && !suspended.get();
+  }
+
+  /**
+   * Suspend the service.
+   */
+  @VisibleForTesting
+  public void suspend() {
+    suspended.set(true);
+  }
+
+  /**
+   * Resume the service if suspended.
+   */
+  @VisibleForTesting
+  public void resume() {
+    suspended.set(false);
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new 
OMEventListenerLedgerPoller.CompletedRequestInfoConsumerTask());
+    return queue;
+  }
+
+  public AtomicLong getRunCount() {
+    return runCount;
+  }
+
+  private class CompletedRequestInfoConsumerTask implements BackgroundTask {
+
+    @Override
+    public int getPriority() {
+      return 0;
+    }
+
+    @Override
+    public BackgroundTaskResult call() {
+      if (shouldRun()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Running OMEventListenerLedgerPoller");
+        }
+        if (runCount.get() == 0) {
+          seekPosition.set(seekPosition.initSeekPosition());
+        }
+        getRunCount().incrementAndGet();
+
+        try {
+          String startKeyStr = seekPosition.get();
+          Long startKey = StringUtils.isNotBlank(startKeyStr) ? 
Long.valueOf(startKeyStr) : null;
+          for (OmCompletedRequestInfo requestInfo : 
pluginContext.listCompletedRequestInfo(
+                  startKey, MAX_RESULTS)) {
+            callback.accept(requestInfo);
+          }
+          successRunCount.incrementAndGet();
+        } catch (IOException e) {
+          LOG.error("Error while running completed operation consumer " +
+              "background task. Will retry at next run.", e);
+        }
+      } else {
+        runCount.set(0);
+      }
+
+      // place holder by returning empty results of this call back.
+      return BackgroundTaskResult.EmptyTaskResult.newResult();
+    }
+  }
+
+  public long getSuccessfulRunCount() {
+    return successRunCount.get();
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPollerSeekPosition.java
 
b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPollerSeekPosition.java
new file mode 100644
index 00000000000..bccbda1e2a7
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPollerSeekPosition.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.eventlistener;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a helper class to get/set the seek position used by the
+ * OMEventListenerLedgerPoller.
+ *
+ * XXX: the seek position should be persisted (and ideally distributed to
+ * all OMs) but at the moment it only lives in memory
+ */
+public class OMEventListenerLedgerPollerSeekPosition {
+  public static final Logger LOG = 
LoggerFactory.getLogger(OMEventListenerLedgerPollerSeekPosition.class);
+
+  private final AtomicReference<String> seekPosition;
+
+  public OMEventListenerLedgerPollerSeekPosition() {
+    this.seekPosition = new AtomicReference(initSeekPosition());
+  }
+
+  // TODO: load this from persistent storage
+  public String initSeekPosition() {
+    return null;
+  }
+
+  public String get() {
+    return seekPosition.get();
+  }
+
+  public void set(String val) {
+    LOG.debug("Setting seek position {}", val);
+    // NOTE: this in-memory view of the seek position needs to be kept
+    // up to date because the OMEventListenerLedgerPoller has a
+    // reference to it
+    seekPosition.set(val);
+  }
+
+  @Override
+  public String toString() {
+    return "OMEventListenerLedgerPollerSeekPosition{" +
+        "seekPosition='" + seekPosition.get() + "'" +
+        '}';
+  }
+}
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java
 
b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/package-info.java
similarity index 86%
copy from 
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java
copy to 
hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/package-info.java
index 41b349754a7..dbda5e337cc 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java
+++ 
b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/package-info.java
@@ -15,12 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.ozone.om.eventlistener;
-
 /**
- * A narrow set of functionality we are ok with exposing to plugin
- * implementations.
+ * This package contains classes for the OM Event Listener implementation.
  */
-public interface OMEventListenerPluginContext {
-
-}
+package org.apache.hadoop.ozone.om.eventlistener;
diff --git 
a/hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerKafkaPublisher.java
 
b/hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerKafkaPublisher.java
new file mode 100644
index 00000000000..761f8f3c117
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerKafkaPublisher.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.eventlistener;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo;
+import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo.OperationArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.hadoop.util.Time;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockedConstruction;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Tests {@link OMEventListenerKafkaPublisher}.
+ */
+@ExtendWith(MockitoExtension.class)
+public class TestOMEventListenerKafkaPublisher {
+
+  private static final String VOLUME_NAME = "vol1";
+  private static final String BUCKET_NAME = "bucket1";
+
+  @Mock
+  private OMEventListenerPluginContext pluginContext;
+
+  // helper to create json key/val string for non exhaustive JSON
+  // attribute checking
+  private static String toJsonKeyVal(String key, String val) {
+    return new StringBuilder()
+      .append('\"')
+      .append(key)
+      .append('\"')
+      .append(':')
+      .append('\"')
+      .append(val)
+      .append('\"')
+      .toString();
+  }
+
+  private static OmCompletedRequestInfo buildCompletedRequestInfo(
+          long trxLogIndex, Type cmdType, String keyName, OperationArgs 
opArgs) {
+
+    return new OmCompletedRequestInfo.Builder()
+        .setTrxLogIndex(trxLogIndex)
+        .setCmdType(cmdType)
+        .setVolumeName(VOLUME_NAME)
+        .setBucketName(BUCKET_NAME)
+        .setKeyName(keyName)
+        .setCreationTime(Time.now())
+        .setOpArgs(opArgs)
+        .build();
+  }
+
+  private List<String> captureEventsProducedByOperation(OmCompletedRequestInfo 
op, int expectEvents)
+          throws IOException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.set("ozone.om.plugin.kafka.topic", "abc");
+
+    List<String> events = new ArrayList<>();
+
+    OMEventListenerKafkaPublisher plugin = new OMEventListenerKafkaPublisher();
+    try (MockedConstruction<OMEventListenerKafkaPublisher.KafkaClientWrapper> 
mockedKafkaClientWrapper =
+             
mockConstruction(OMEventListenerKafkaPublisher.KafkaClientWrapper.class)) {
+
+      plugin.initialize(conf, pluginContext);
+      plugin.handleCompletedRequest(op);
+
+      OMEventListenerKafkaPublisher.KafkaClientWrapper mock = 
mockedKafkaClientWrapper.constructed().get(0);
+      ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
+      verify(mock, times(expectEvents)).send(argument.capture());
+
+      events.addAll(argument.getAllValues());
+    }
+
+    return events;
+  }
+
+  @Test
+  public void testCreateKeyRequestProducesS3CreatedEvent() throws 
InterruptedException, IOException {
+    OmCompletedRequestInfo createRequest = buildCompletedRequestInfo(1L, 
Type.CreateKey, "some/key1",
+        new OperationArgs.NoArgs());
+
+    List<String> events = captureEventsProducedByOperation(createRequest, 1);
+    assertThat(events).hasSize(1);
+
+    assertThat(events.get(0))
+        .contains(toJsonKeyVal("key", "vol1/bucket1/some/key1"))
+        .contains(toJsonKeyVal("type", "CreateKey"));
+  }
+
+  @Test
+  public void testCreateFileRequestProducesS3CreatedEvent() throws 
InterruptedException, IOException {
+    boolean recursive = false;
+    boolean overwrite = true;
+
+    OmCompletedRequestInfo createRequest = buildCompletedRequestInfo(2L, 
Type.CreateFile, "some/key2",
+        new OperationArgs.CreateFileArgs(recursive, overwrite));
+
+    List<String> events = captureEventsProducedByOperation(createRequest, 1);
+    assertThat(events).hasSize(1);
+
+    assertThat(events.get(0))
+        .contains(toJsonKeyVal("key", "vol1/bucket1/some/key2"))
+        .contains(toJsonKeyVal("type", "CreateFile"));
+  }
+
+  @Test
+  public void testCreateDirectoryRequestProducesS3CreatedEvent() throws 
InterruptedException, IOException {
+    OmCompletedRequestInfo createRequest = buildCompletedRequestInfo(3L, 
Type.CreateDirectory, "some/key3",
+        new OperationArgs.NoArgs());
+
+    List<String> events = captureEventsProducedByOperation(createRequest, 1);
+    assertThat(events).hasSize(1);
+
+    assertThat(events.get(0))
+        .contains(toJsonKeyVal("key", "vol1/bucket1/some/key3"))
+        .contains(toJsonKeyVal("type", "CreateDirectory"));
+  }
+
+  @Test
+  public void testRenameRequestProducesRenameKeyEvent() throws 
InterruptedException, IOException {
+    OmCompletedRequestInfo renameRequest = buildCompletedRequestInfo(4L, 
Type.RenameKey, "some/key4",
+        new OperationArgs.RenameKeyArgs("some/key_RENAMED"));
+
+    List<String> events = captureEventsProducedByOperation(renameRequest, 1);
+    assertThat(events).hasSize(1);
+
+    assertThat(events.get(0))
+        .contains(toJsonKeyVal("key", "vol1/bucket1/some/key4"))
+        .contains(toJsonKeyVal("type", "RenameKey"));
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/pom.xml 
b/hadoop-ozone/ozone-manager/pom.xml
index 83dacbe4eb3..2ef5a410ff0 100644
--- a/hadoop-ozone/ozone-manager/pom.xml
+++ b/hadoop-ozone/ozone-manager/pom.xml
@@ -243,6 +243,11 @@
       <artifactId>netty-tcnative-boringssl-static</artifactId>
       <scope>runtime</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.ozone</groupId>
+      <artifactId>ozone-manager-plugins</artifactId>
+      <scope>runtime</scope>
+    </dependency>
 
     <!-- Test dependencies -->
     <dependency>
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index df3af67bc3b..a09cc3a8a2b 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -1340,6 +1340,58 @@ private List<OmVolumeArgs> listAllVolumes(String prefix, 
String startKey,
     return result;
   }
 
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public List<OmCompletedRequestInfo> listCompletedRequestInfo(final Long 
startKey,
+                                                               final int 
maxResults)
+      throws IOException {
+    List<OmCompletedRequestInfo> results = new ArrayList<>();
+
+    Table.KeyValue<Long, OmCompletedRequestInfo> completedRequestInfoRow;
+    try (TableIterator<Long, ? extends Table.KeyValue<Long, 
OmCompletedRequestInfo>>
+            tableIterator = getCompletedRequestInfoTable().iterator()) {
+
+      boolean skipFirst = false;
+      if (startKey != null) {
+        // TODO: what happens if the seek position is no longer
+        // available?  Do we go to the end of the list
+        // or the first key > startKey
+        tableIterator.seek(startKey);
+        skipFirst = true;
+      }
+
+      while (tableIterator.hasNext() && results.size() < maxResults) {
+        completedRequestInfoRow = tableIterator.next();
+        // this is the first loop iteration after the seek so we
+        // need to skip the record we seeked to (if it is still
+        // present)
+        if (skipFirst) {
+          skipFirst = false;
+          // NOTE: I'm assuming that we need to conditionally do this
+          // only if it is equal to what we wanted to seek to (hence
+          if (!Objects.equals(startKey, completedRequestInfoRow.getKey())) {
+            // when we have a startKey we expect the first result
+            // to be that startKey.  If it is not then we can infer that
+            // the startKey was already cleaned up and therefore we have
+            // missed some records somehow and this needs flagged to the
+            // caller.
+            // TODO: we should throw a custom exception here (instead of
+            // IOException) that needs to be handled appropriately by
+            // callers
+            throw new IOException(
+                "Missing rows - start key not found (startKey=" + startKey
+                + ", foundKey=" + completedRequestInfoRow.getKey() + ")");
+          }
+        } else {
+          results.add(completedRequestInfoRow.getValue());
+        }
+      }
+    }
+    return results;
+  }
+
   private PersistedUserVolumeInfo getVolumesByUser(String userNameKey)
       throws OMException {
     try {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContextImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContextImpl.java
index 12573c62569..e0a805f0afc 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContextImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContextImpl.java
@@ -17,19 +17,37 @@
 
 package org.apache.hadoop.ozone.om.eventlistener;
 
+import java.io.IOException;
+import java.util.List;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo;
 
 /**
  * A narrow set of functionality we are ok with exposing to plugin
  * implementations.
  */
 public final class OMEventListenerPluginContextImpl implements 
OMEventListenerPluginContext {
-  private final OzoneManager ozoneManager; // NOPMD: unused.  will be used soon
+  private final OzoneManager ozoneManager;
 
   public OMEventListenerPluginContextImpl(OzoneManager ozoneManager) {
     this.ozoneManager = ozoneManager;
   }
 
-  // TODO: fill this out with capabilities we would like to expose to
-  // plugin implementations.
+  @Override
+  public boolean isLeaderReady() {
+    return ozoneManager.isLeaderReady();
+  }
+
+  // TODO: should we allow plugins to pass in maxResults or just limit
+  // them to some predefined value for safety?  e.g. 10K
+  @Override
+  public List<OmCompletedRequestInfo> listCompletedRequestInfo(Long startKey, 
int maxResults) throws IOException {
+    return 
ozoneManager.getMetadataManager().listCompletedRequestInfo(startKey, 
maxResults);
+  }
+
+  // TODO: it feels like this doesn't belong here
+  @Override
+  public String getThreadNamePrefix() {
+    return ozoneManager.getThreadNamePrefix();
+  }
 }
diff --git a/hadoop-ozone/pom.xml b/hadoop-ozone/pom.xml
index 417f05064ef..4b42e075417 100644
--- a/hadoop-ozone/pom.xml
+++ b/hadoop-ozone/pom.xml
@@ -46,6 +46,7 @@
     <module>mini-cluster</module>
     <module>multitenancy-ranger</module>
     <module>ozone-manager</module>
+    <module>ozone-manager-plugins</module>
     <module>ozonefs</module>
     <module>ozonefs-common</module>
     <module>recon</module>
diff --git a/pom.xml b/pom.xml
index 7db5ff9b318..098e99c3273 100644
--- a/pom.xml
+++ b/pom.xml
@@ -132,6 +132,7 @@
     <jsp-api.version>2.1</jsp-api.version>
     <jsr311-api.version>1.1.1</jsr311-api.version>
     <junit5.version>5.14.2</junit5.version>
+    <kafka.version>3.9.2</kafka.version>
     <kerby.version>1.0.1</kerby.version>
     <kotlin.version>1.9.25</kotlin.version>
     <license-maven-plugin.version>2.7.1</license-maven-plugin.version>
@@ -995,6 +996,11 @@
         <artifactId>httpcore-nio</artifactId>
         <version>${httpcore.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.kafka</groupId>
+        <artifactId>kafka-clients</artifactId>
+        <version>${kafka.version}</version>
+      </dependency>
       <dependency>
         <groupId>org.apache.kerby</groupId>
         <artifactId>kerb-core</artifactId>
@@ -1272,6 +1278,11 @@
         <version>${ozone.version}</version>
         <type>test-jar</type>
       </dependency>
+      <dependency>
+        <groupId>org.apache.ozone</groupId>
+        <artifactId>ozone-manager-plugins</artifactId>
+        <version>${ozone.version}</version>
+      </dependency>
       <dependency>
         <groupId>org.apache.ozone</groupId>
         <artifactId>ozone-mini-cluster</artifactId>
@@ -2144,6 +2155,7 @@
                 -->
                 
<ignoredNonTestScopedDependency>com.fasterxml.jackson.core:jackson-databind:jar</ignoredNonTestScopedDependency>
                 
<ignoredNonTestScopedDependency>org.apache.commons:commons-compress:jar</ignoredNonTestScopedDependency>
+                
<ignoredNonTestScopedDependency>org.apache.hadoop:hadoop-common:jar</ignoredNonTestScopedDependency>
                 
<ignoredNonTestScopedDependency>org.apache.ozone:hdds-client:jar</ignoredNonTestScopedDependency>
                 
<ignoredNonTestScopedDependency>org.apache.ozone:ozone-interface-client:jar</ignoredNonTestScopedDependency>
                 
<ignoredNonTestScopedDependency>org.glassfish.jersey.core:jersey-common:jar</ignoredNonTestScopedDependency>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to