This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 88428f2bad [#6781] feat(lineage): support lineage sinks (#6891)
88428f2bad is described below

commit 88428f2bad1a1f67ced55b0c5551ac61d9f01117
Author: FANNG <[email protected]>
AuthorDate: Tue Apr 15 10:58:30 2025 +0800

    [#6781] feat(lineage): support lineage sinks (#6891)
    
    ### What changes were proposed in this pull request?
    
    Support lineage sinks, Linkage sinks manager will load all linkage sinks
    and register them as event listeners.
    
    ### Why are the changes needed?
    
    Fix: #6781
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    
    1. add UT
    2. setup a local environment to test spark linage
---
 .../gravitino/listener/AsyncQueueListener.java     |   9 +-
 .../org/apache/gravitino/listener/EventBus.java    |  27 ++++-
 .../gravitino/listener/EventListenerManager.java   |   7 +-
 .../listener/EventListenerPluginWrapper.java       |   2 +-
 .../gravitino/listener/api/event/EventWrapper.java |  53 +++++++++
 lineage/build.gradle.kts                           |   4 +-
 .../apache/gravitino/lineage/LineageConfig.java    |  15 +++
 .../apache/gravitino/lineage/LineageService.java   |   2 +
 .../gravitino/lineage/sink/LineageLogSink.java     |  18 +--
 .../lineage/sink/LineageSinkEventListener.java     |  72 ++++++++++++
 .../gravitino/lineage/sink/LineageSinkManager.java |  82 ++++++++++++-
 .../gravitino/lineage/TestLineageConfig.java       |   3 +
 .../lineage/sink/LineageSinkForTest.java}          |  35 ++++--
 .../lineage/sink/TestLineageSinkManager.java       | 128 +++++++++++++++++++++
 server-common/build.gradle.kts                     |   3 +
 .../gravitino/server/web/ObjectMapperProvider.java |   0
 16 files changed, 415 insertions(+), 45 deletions(-)

diff --git 
a/core/src/main/java/org/apache/gravitino/listener/AsyncQueueListener.java 
b/core/src/main/java/org/apache/gravitino/listener/AsyncQueueListener.java
index 18043964dd..1bc24f445d 100644
--- a/core/src/main/java/org/apache/gravitino/listener/AsyncQueueListener.java
+++ b/core/src/main/java/org/apache/gravitino/listener/AsyncQueueListener.java
@@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory;
 public class AsyncQueueListener implements EventListenerPlugin {
   private static final Logger LOG = 
LoggerFactory.getLogger(AsyncQueueListener.class);
   private static final String NAME_PREFIX = "async-queue-listener-";
+  private static final float HIGH_WATERMARK_RATIO = 0.9f;
 
   private final List<EventListenerPlugin> eventListeners;
   private final BlockingQueue<BaseEvent> queue;
@@ -54,6 +55,7 @@ public class AsyncQueueListener implements 
EventListenerPlugin {
   private final AtomicLong lastDropEventCounters = new AtomicLong(0);
   private Instant lastRecordDropEventTime;
   private final String asyncQueueListenerName;
+  private final int highWatermarkThreshold;
 
   public AsyncQueueListener(
       List<EventListenerPlugin> listeners,
@@ -65,6 +67,7 @@ public class AsyncQueueListener implements 
EventListenerPlugin {
     this.queue = new LinkedBlockingQueue<>(queueCapacity);
     this.asyncProcessor = new Thread(() -> processEvents());
     this.dispatcherJoinSeconds = dispatcherJoinSeconds;
+    this.highWatermarkThreshold = (int) (queueCapacity * HIGH_WATERMARK_RATIO);
     asyncProcessor.setDaemon(true);
     asyncProcessor.setName(asyncQueueListenerName);
   }
@@ -104,8 +107,12 @@ public class AsyncQueueListener implements 
EventListenerPlugin {
     eventListeners.forEach(listenerPlugin -> listenerPlugin.stop());
   }
 
+  public boolean isHighWatermark() {
+    return queue.size() > highWatermarkThreshold;
+  }
+
   @VisibleForTesting
-  List<EventListenerPlugin> getEventListeners() {
+  public List<EventListenerPlugin> getEventListeners() {
     return this.eventListeners;
   }
 
diff --git a/core/src/main/java/org/apache/gravitino/listener/EventBus.java 
b/core/src/main/java/org/apache/gravitino/listener/EventBus.java
index d851dc2927..24277c8462 100644
--- a/core/src/main/java/org/apache/gravitino/listener/EventBus.java
+++ b/core/src/main/java/org/apache/gravitino/listener/EventBus.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.listener;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.util.List;
+import java.util.stream.Collectors;
 import org.apache.gravitino.exceptions.ForbiddenException;
 import org.apache.gravitino.listener.api.EventListenerPlugin;
 import org.apache.gravitino.listener.api.event.BaseEvent;
@@ -33,12 +34,19 @@ import org.apache.gravitino.listener.api.event.PreEvent;
  * within its internal management.
  */
 public class EventBus {
-  // Holds instances of EventListenerPlugin. These instances can either be
-  // EventListenerPluginWrapper,
-  // which are meant for synchronous event listening, or AsyncQueueListener, 
designed for
-  // asynchronous event processing.
+  /**
+   * Holds all instances of {@link EventListenerPlugin}. These instances can 
either be {@link
+   * EventListenerPluginWrapper} which are used for synchronous event process, 
or {@link
+   * AsyncQueueListener} for asynchronous event processing.
+   */
   private final List<EventListenerPlugin> eventListeners;
 
+  /**
+   * Holds instances of {@link AsyncQueueListener}, mainly used to check the 
status of async queue,
+   * like {@link #isHighWatermark()}.
+   */
+  private final List<AsyncQueueListener> asyncQueueListeners;
+
   /**
    * Constructs an EventBus with a predefined list of event listeners.
    *
@@ -47,6 +55,11 @@ public class EventBus {
    */
   public EventBus(List<EventListenerPlugin> eventListeners) {
     this.eventListeners = eventListeners;
+    this.asyncQueueListeners =
+        eventListeners.stream()
+            .filter(AsyncQueueListener.class::isInstance)
+            .map(AsyncQueueListener.class::cast)
+            .collect(Collectors.toList());
   }
 
   /**
@@ -65,6 +78,10 @@ public class EventBus {
     }
   }
 
+  public boolean isHighWatermark() {
+    return 
asyncQueueListeners.stream().anyMatch(AsyncQueueListener::isHighWatermark);
+  }
+
   /**
    * Retrieves the list of registered post-event listeners. This method is 
primarily intended for
    * testing purposes to verify the correct registration and functioning of 
event listeners.
@@ -73,7 +90,7 @@ public class EventBus {
    *     EventBus.
    */
   @VisibleForTesting
-  List<EventListenerPlugin> getEventListeners() {
+  public List<EventListenerPlugin> getEventListeners() {
     return eventListeners;
   }
 
diff --git 
a/core/src/main/java/org/apache/gravitino/listener/EventListenerManager.java 
b/core/src/main/java/org/apache/gravitino/listener/EventListenerManager.java
index e018f8c13c..224d4e2d16 100644
--- a/core/src/main/java/org/apache/gravitino/listener/EventListenerManager.java
+++ b/core/src/main/java/org/apache/gravitino/listener/EventListenerManager.java
@@ -19,7 +19,6 @@
 
 package org.apache.gravitino.listener;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
@@ -51,9 +50,9 @@ import org.slf4j.LoggerFactory;
 public class EventListenerManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(EventListenerManager.class);
   public static final String GRAVITINO_EVENT_LISTENER_PREFIX = 
"gravitino.eventListener.";
-  static final String GRAVITINO_EVENT_LISTENER_NAMES = "names";
-  @VisibleForTesting static final String GRAVITINO_EVENT_LISTENER_CLASS = 
"class";
-  static final String GRAVITINO_EVENT_LISTENER_QUEUE_CAPACITY = 
"queueCapacity";
+  public static final String GRAVITINO_EVENT_LISTENER_NAMES = "names";
+  public static final String GRAVITINO_EVENT_LISTENER_CLASS = "class";
+  public static final String GRAVITINO_EVENT_LISTENER_QUEUE_CAPACITY = 
"queueCapacity";
   static final String GRAVITINO_EVENT_LISTENER_DISPATCHER_JOIN_SECONDS = 
"dispatcherJoinSeconds";
   private static final Splitter splitter = Splitter.on(",");
   private static final Joiner DOT = Joiner.on(".");
diff --git 
a/core/src/main/java/org/apache/gravitino/listener/EventListenerPluginWrapper.java
 
b/core/src/main/java/org/apache/gravitino/listener/EventListenerPluginWrapper.java
index 8e0a2ffbc4..a170d727bb 100644
--- 
a/core/src/main/java/org/apache/gravitino/listener/EventListenerPluginWrapper.java
+++ 
b/core/src/main/java/org/apache/gravitino/listener/EventListenerPluginWrapper.java
@@ -95,7 +95,7 @@ public class EventListenerPluginWrapper implements 
EventListenerPlugin {
   }
 
   @VisibleForTesting
-  EventListenerPlugin getUserEventListener() {
+  public EventListenerPlugin getUserEventListener() {
     return userEventListener;
   }
 
diff --git 
a/core/src/main/java/org/apache/gravitino/listener/api/event/EventWrapper.java 
b/core/src/main/java/org/apache/gravitino/listener/api/event/EventWrapper.java
new file mode 100644
index 0000000000..67a6bb6076
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/listener/api/event/EventWrapper.java
@@ -0,0 +1,53 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.listener.api.event;
+
+import org.apache.gravitino.NameIdentifier;
+
+/**
+ * A generic wrapper class that extends {@link Event}. It serves to 
encapsulate an object of type
+ * {@code T}. Unlike events generated by Gravitino operations such as table 
creation, the {@link
+ * EventWrapper} is not directly produced by these actions. Instead, it is 
designed to dispatch
+ * events for internal modules, specifically for components like lineage sinks 
within the system.
+ *
+ * @param <T> The type of the object to be wrapped
+ */
+public class EventWrapper<T> extends Event {
+  private final T object;
+
+  /**
+   * Constructs an {@code EventWrapper} with the specified object.
+   *
+   * @param object the object to be wrapped
+   */
+  public EventWrapper(T object) {
+    super("mockUser", NameIdentifier.of("mock"));
+    this.object = object;
+  }
+
+  /**
+   * Retrieves the wrapped generic object.
+   *
+   * @return the wrapped object of type {@code T}
+   */
+  public T getObject() {
+    return object;
+  }
+}
diff --git a/lineage/build.gradle.kts b/lineage/build.gradle.kts
index dd462e3bf3..3706d42c70 100644
--- a/lineage/build.gradle.kts
+++ b/lineage/build.gradle.kts
@@ -28,9 +28,6 @@ dependencies {
   implementation(libs.bundles.jersey)
   implementation(libs.commons.lang3)
   implementation(libs.guava)
-  implementation(libs.jackson.datatype.jdk8)
-  implementation(libs.jackson.datatype.jsr310)
-  implementation(libs.jackson.databind)
   implementation(libs.metrics.jersey2)
   implementation(libs.openlineage.java) {
     isTransitive = false
@@ -42,6 +39,7 @@ dependencies {
   testAnnotationProcessor(libs.lombok)
   testCompileOnly(libs.lombok)
 
+  testImplementation(libs.awaitility)
   testImplementation(libs.jersey.test.framework.core) {
     exclude(group = "org.junit.jupiter")
   }
diff --git 
a/lineage/src/main/java/org/apache/gravitino/lineage/LineageConfig.java 
b/lineage/src/main/java/org/apache/gravitino/lineage/LineageConfig.java
index ac7185336b..e27c3d98da 100644
--- a/lineage/src/main/java/org/apache/gravitino/lineage/LineageConfig.java
+++ b/lineage/src/main/java/org/apache/gravitino/lineage/LineageConfig.java
@@ -19,6 +19,7 @@
 
 package org.apache.gravitino.lineage;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
 import java.util.HashMap;
@@ -38,6 +39,7 @@ public class LineageConfig extends Config {
 
   public static final String LINEAGE_CONFIG_PREFIX = "gravitino.lineage.";
   public static final String LINEAGE_CONFIG_SINKS = "sinks";
+  public static final String LINEAGE_SINK_QUEUE_CAPACITY = "sinkQueueCapacity";
   public static final String LINEAGE_CONFIG_SOURCE = "source";
   public static final String LINEAGE_SOURCE_CLASS_NAME = "sourceClass";
   public static final String LINEAGE_PROCESSOR_CLASS_NAME = "processorClass";
@@ -47,6 +49,8 @@ public class LineageConfig extends Config {
   public static final String LINEAGE_LOG_SINK_NAME = "log";
   public static final String LINEAGE_HTTP_SOURCE_NAME = "http";
 
+  @VisibleForTesting static final int LINEAGE_SINK_QUEUE_CAPACITY_DEFAULT = 
10000;
+
   private static final Splitter splitter = Splitter.on(",");
 
   public static final ConfigEntry<String> SOURCE_NAME =
@@ -70,6 +74,13 @@ public class LineageConfig extends Config {
           .stringConf()
           .createWithDefault(LINEAGE_LOG_SINK_NAME);
 
+  public static final ConfigEntry<Integer> SINK_QUEUE_CAPACITY =
+      new ConfigBuilder(LINEAGE_SINK_QUEUE_CAPACITY)
+          .doc("The capacity of the total lineage sink queue")
+          .version(ConfigConstants.VERSION_0_9_0)
+          .intConf()
+          .createWithDefault(LINEAGE_SINK_QUEUE_CAPACITY_DEFAULT);
+
   public LineageConfig(Map<String, String> properties) {
     super(false);
     loadFromMap(properties, k -> true);
@@ -104,6 +115,10 @@ public class LineageConfig extends Config {
       m.put(LINEAGE_CONFIG_SINKS, sinkString);
     }
 
+    if (!m.containsKey(LINEAGE_SINK_QUEUE_CAPACITY)) {
+      m.put(LINEAGE_SINK_QUEUE_CAPACITY, 
String.valueOf(get(SINK_QUEUE_CAPACITY)));
+    }
+
     String logClassConfigKey =
         LineageConfig.LINEAGE_LOG_SINK_NAME + "." + 
LineageConfig.LINEAGE_SINK_CLASS_NAME;
     if (sinks.contains(LINEAGE_LOG_SINK_NAME) && 
!config.containsKey(logClassConfigKey)) {
diff --git 
a/lineage/src/main/java/org/apache/gravitino/lineage/LineageService.java 
b/lineage/src/main/java/org/apache/gravitino/lineage/LineageService.java
index 2667d3e5ca..2c74456d06 100644
--- a/lineage/src/main/java/org/apache/gravitino/lineage/LineageService.java
+++ b/lineage/src/main/java/org/apache/gravitino/lineage/LineageService.java
@@ -55,9 +55,11 @@ public class LineageService implements LineageDispatcher, 
SupportsRESTPackages {
   public void close() {
     if (source != null) {
       source.close();
+      source = null;
     }
     if (sinkManager != null) {
       sinkManager.close();
+      sinkManager = null;
     }
   }
 
diff --git 
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageLogSink.java 
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageLogSink.java
index cdff7822f7..2ea127bc62 100644
--- 
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageLogSink.java
+++ 
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageLogSink.java
@@ -19,31 +19,17 @@
 
 package org.apache.gravitino.lineage.sink;
 
-import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.MapperFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.databind.cfg.EnumFeature;
-import com.fasterxml.jackson.databind.json.JsonMapper;
-import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
 import io.openlineage.server.OpenLineage.RunEvent;
 import org.apache.gravitino.lineage.Utils;
+import org.apache.gravitino.server.web.ObjectMapperProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class LineageLogSink implements LineageSink {
   private static final Logger LOG = 
LoggerFactory.getLogger(LineageLogSink.class);
-  private ObjectMapper objectMapper =
-      JsonMapper.builder()
-          .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
-          .configure(EnumFeature.WRITE_ENUMS_TO_LOWERCASE, true)
-          .enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS)
-          .build()
-          .setSerializationInclusion(JsonInclude.Include.NON_NULL)
-          .registerModule(new JavaTimeModule())
-          .registerModule(new Jdk8Module());
+  private ObjectMapper objectMapper = ObjectMapperProvider.objectMapper();
   private LineageLogger logger = new LineageLogger();
 
   private static class LineageLogger {
diff --git 
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkEventListener.java
 
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkEventListener.java
new file mode 100644
index 0000000000..2ed4625ecc
--- /dev/null
+++ 
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkEventListener.java
@@ -0,0 +1,72 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.lineage.sink;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import io.openlineage.server.OpenLineage.RunEvent;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.lineage.LineageConfig;
+import org.apache.gravitino.listener.api.EventListenerPlugin;
+import org.apache.gravitino.listener.api.event.Event;
+import org.apache.gravitino.listener.api.event.EventWrapper;
+import org.apache.gravitino.utils.ClassUtils;
+
+public class LineageSinkEventListener implements EventListenerPlugin {
+
+  private LineageSink lineageSink;
+
+  @Override
+  public void init(Map<String, String> properties) throws RuntimeException {
+    String sinkClassName = 
properties.get(LineageConfig.LINEAGE_SINK_CLASS_NAME);
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(sinkClassName),
+        LineageConfig.LINEAGE_SINK_CLASS_NAME + " is not set.");
+    this.lineageSink = ClassUtils.loadClass(sinkClassName);
+    lineageSink.initialize(properties);
+  }
+
+  @Override
+  public void start() throws RuntimeException {}
+
+  @Override
+  public void stop() throws RuntimeException {
+    if (lineageSink != null) {
+      lineageSink.close();
+    }
+  }
+
+  @Override
+  public Mode mode() {
+    return Mode.ASYNC_ISOLATED;
+  }
+
+  @Override
+  public void onPostEvent(Event postEvent) throws RuntimeException {
+    EventWrapper<RunEvent> wrapper = (EventWrapper<RunEvent>) postEvent;
+    lineageSink.sink(wrapper.getObject());
+  }
+
+  @VisibleForTesting
+  LineageSink lineageSink() {
+    return lineageSink;
+  }
+}
diff --git 
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
 
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
index f01dcc80e7..19d07c4a93 100644
--- 
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
+++ 
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
@@ -19,23 +19,95 @@
 
 package org.apache.gravitino.lineage.sink;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import io.openlineage.server.OpenLineage.RunEvent;
 import java.io.Closeable;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.lineage.LineageConfig;
+import org.apache.gravitino.listener.EventBus;
+import org.apache.gravitino.listener.EventListenerManager;
+import org.apache.gravitino.listener.api.event.EventWrapper;
 
-@SuppressWarnings("unused")
 public class LineageSinkManager implements Closeable {
+  private EventBus eventBus;
+  private final EventListenerManager eventListenerManager;
 
-  public void initialize(List<String> sinks, Map<String, String> 
LineageConfigs) {}
+  public LineageSinkManager() {
+    this.eventListenerManager = new EventListenerManager();
+  }
+
+  public void initialize(List<String> sinks, Map<String, String> 
LineageConfigs) {
+    Map<String, String> eventListenerConfigs =
+        transformToEventListenerConfigs(sinks, LineageConfigs);
+    eventListenerManager.init(eventListenerConfigs);
+    this.eventBus = eventListenerManager.createEventBus();
+    eventListenerManager.start();
+  }
 
   // Checks if the sink queue size surpasses the threshold to avoid 
overwhelming lineage sinks.
   public boolean isHighWatermark() {
-    return false;
+    return eventBus.isHighWatermark();
   }
 
-  public void sink(RunEvent runEvent) {}
+  /**
+   * The lineage event is dispatched by event listener system with a dedicated 
async event listener
+   * {@link LineageSinkEventListener} which wrap the lineage sink class. 
Consequently, we must
+   * convert the lineage configuration into event listener configurations. 
This conversion includes
+   * details such as event listener names, event listener classes, and the 
capacity of the
+   * asynchronous event queue.
+   */
+  @VisibleForTesting
+  static Map<String, String> transformToEventListenerConfigs(
+      List<String> sinks, Map<String, String> lineageConfigs) {
+    Map<String, String> eventListenerConfigs = new HashMap<>();
+    eventListenerConfigs.putAll(generateEventListenerConfigs(sinks, 
lineageConfigs));
+    eventListenerConfigs.putAll(lineageConfigs);
+    return eventListenerConfigs;
+  }
+
+  private static Map<String, String> generateEventListenerConfigs(
+      List<String> sinks, Map<String, String> lineageConfigs) {
+    HashMap eventListenerConfigs = new HashMap();
+    if (sinks.isEmpty()) {
+      return eventListenerConfigs;
+    }
+
+    String queueCapacity = 
lineageConfigs.get(LineageConfig.LINEAGE_SINK_QUEUE_CAPACITY);
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(queueCapacity), "Lineage sink queue capacity is 
not set");
+    int capacityPerSink = Integer.valueOf(queueCapacity) / sinks.size();
+
+    eventListenerConfigs.put(
+        EventListenerManager.GRAVITINO_EVENT_LISTENER_NAMES, String.join(",", 
sinks));
+    sinks.forEach(
+        sinkName -> {
+          eventListenerConfigs.put(
+              sinkName + "." + 
EventListenerManager.GRAVITINO_EVENT_LISTENER_CLASS,
+              LineageSinkEventListener.class.getName());
+          eventListenerConfigs.put(
+              sinkName + "." + 
EventListenerManager.GRAVITINO_EVENT_LISTENER_QUEUE_CAPACITY,
+              String.valueOf(capacityPerSink));
+        });
+    return eventListenerConfigs;
+  }
+
+  public void sink(RunEvent runEvent) {
+    eventBus.dispatchEvent(new EventWrapper(runEvent));
+  }
 
   @Override
-  public void close() {}
+  public void close() {
+    if (eventListenerManager != null) {
+      eventListenerManager.stop();
+    }
+  }
+
+  @VisibleForTesting
+  EventBus eventBus() {
+    return eventBus;
+  }
 }
diff --git 
a/lineage/src/test/java/org/apache/gravitino/lineage/TestLineageConfig.java 
b/lineage/src/test/java/org/apache/gravitino/lineage/TestLineageConfig.java
index d4dbfa8a3a..26f52733b2 100644
--- a/lineage/src/test/java/org/apache/gravitino/lineage/TestLineageConfig.java
+++ b/lineage/src/test/java/org/apache/gravitino/lineage/TestLineageConfig.java
@@ -64,6 +64,9 @@ public class TestLineageConfig {
         sinkConfigs.get(
             LineageConfig.LINEAGE_LOG_SINK_NAME + "." + 
LineageConfig.LINEAGE_SINK_CLASS_NAME);
     Assertions.assertEquals(LineageLogSink.class.getName(), className);
+    String capacity = 
sinkConfigs.get(LineageConfig.LINEAGE_SINK_QUEUE_CAPACITY);
+    Assertions.assertEquals(
+        LineageConfig.LINEAGE_SINK_QUEUE_CAPACITY_DEFAULT, 
Integer.parseInt(capacity));
 
     // config multi sinks
     Map<String, String> config2 =
diff --git 
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
 
b/lineage/src/test/java/org/apache/gravitino/lineage/sink/LineageSinkForTest.java
similarity index 57%
copy from 
lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
copy to 
lineage/src/test/java/org/apache/gravitino/lineage/sink/LineageSinkForTest.java
index f01dcc80e7..1f62ea9909 100644
--- 
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
+++ 
b/lineage/src/test/java/org/apache/gravitino/lineage/sink/LineageSinkForTest.java
@@ -20,22 +20,37 @@
 package org.apache.gravitino.lineage.sink;
 
 import io.openlineage.server.OpenLineage.RunEvent;
-import java.io.Closeable;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import org.awaitility.Awaitility;
 
-@SuppressWarnings("unused")
-public class LineageSinkManager implements Closeable {
+@Getter
+public class LineageSinkForTest implements LineageSink {
+  private Map<String, String> configs;
+  private List<RunEvent> runEvents = new LinkedList<>();
 
-  public void initialize(List<String> sinks, Map<String, String> 
LineageConfigs) {}
+  @Override
+  public void initialize(Map<String, String> configs) {
+    this.configs = configs;
+  }
 
-  // Checks if the sink queue size surpasses the threshold to avoid 
overwhelming lineage sinks.
-  public boolean isHighWatermark() {
-    return false;
+  @Override
+  public void sink(RunEvent event) {
+    this.runEvents.add(event);
   }
 
-  public void sink(RunEvent runEvent) {}
+  public List<RunEvent> tryGetEvents() {
+    Awaitility.await()
+        .atMost(20, TimeUnit.SECONDS)
+        .pollInterval(10, TimeUnit.MILLISECONDS)
+        .until(() -> getEvents().size() > 0);
+    return getEvents();
+  }
 
-  @Override
-  public void close() {}
+  private List<RunEvent> getEvents() {
+    return runEvents;
+  }
 }
diff --git 
a/lineage/src/test/java/org/apache/gravitino/lineage/sink/TestLineageSinkManager.java
 
b/lineage/src/test/java/org/apache/gravitino/lineage/sink/TestLineageSinkManager.java
new file mode 100644
index 0000000000..6ccf6ac5d2
--- /dev/null
+++ 
b/lineage/src/test/java/org/apache/gravitino/lineage/sink/TestLineageSinkManager.java
@@ -0,0 +1,128 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.lineage.sink;
+
+import io.openlineage.server.OpenLineage.RunEvent;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.SneakyThrows;
+import org.apache.gravitino.lineage.LineageConfig;
+import org.apache.gravitino.listener.AsyncQueueListener;
+import org.apache.gravitino.listener.EventBus;
+import org.apache.gravitino.listener.EventListenerManager;
+import org.apache.gravitino.listener.EventListenerPluginWrapper;
+import org.apache.gravitino.listener.api.EventListenerPlugin;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class TestLineageSinkManager {
+
+  @SneakyThrows
+  @Test
+  void testLineageSinkManager() {
+    LineageSinkManager lineageSinkManager = new LineageSinkManager();
+    lineageSinkManager.initialize(Arrays.asList("sink1", "sink2"), 
getLineageSinkConfig());
+    lineageSinkManager.sink(getEvent());
+    EventBus eventBus = lineageSinkManager.eventBus();
+    List<EventListenerPlugin> listeners = eventBus.getEventListeners();
+    Assertions.assertEquals(2, listeners.size());
+
+    listeners.stream()
+        .forEach(
+            listener -> {
+              Assertions.assertTrue(listener instanceof AsyncQueueListener);
+              AsyncQueueListener asyncQueueListener = (AsyncQueueListener) 
listener;
+              List<EventListenerPlugin> internalListeners = 
asyncQueueListener.getEventListeners();
+              Assertions.assertEquals(1, internalListeners.size());
+              Assertions.assertTrue(internalListeners.get(0) instanceof 
EventListenerPluginWrapper);
+              EventListenerPlugin userListener =
+                  ((EventListenerPluginWrapper) 
internalListeners.get(0)).getUserEventListener();
+              Assertions.assertTrue(userListener instanceof 
LineageSinkEventListener);
+              LineageSink sink = ((LineageSinkEventListener) 
userListener).lineageSink();
+              Assertions.assertTrue(sink instanceof LineageSinkForTest);
+              checkLineageSink((LineageSinkForTest) sink);
+            });
+  }
+
+  @Test
+  void testTransformToEventListenerConfigs() {
+    Map<String, String> configs =
+        LineageSinkManager.transformToEventListenerConfigs(
+            Arrays.asList("sink1", "sink2"), getLineageSinkConfig());
+
+    Assertions.assertEquals(
+        "sink1,sink2", 
configs.get(EventListenerManager.GRAVITINO_EVENT_LISTENER_NAMES));
+
+    Assertions.assertEquals(
+        "500",
+        configs.get("sink1." + 
EventListenerManager.GRAVITINO_EVENT_LISTENER_QUEUE_CAPACITY));
+    Assertions.assertEquals(
+        LineageSinkEventListener.class.getName(),
+        configs.get("sink1." + 
EventListenerManager.GRAVITINO_EVENT_LISTENER_CLASS));
+    Assertions.assertEquals("sink1", configs.get("sink1.name"));
+    Assertions.assertEquals("a", configs.get("sink1.a"));
+
+    Assertions.assertEquals(
+        "500",
+        configs.get("sink2." + 
EventListenerManager.GRAVITINO_EVENT_LISTENER_QUEUE_CAPACITY));
+    Assertions.assertEquals(
+        LineageSinkEventListener.class.getName(),
+        configs.get("sink2." + 
EventListenerManager.GRAVITINO_EVENT_LISTENER_CLASS));
+    Assertions.assertEquals("b", configs.get("sink2.a"));
+  }
+
+  private void checkLineageSink(LineageSinkForTest sink) {
+    Map<String, String> configs = sink.getConfigs();
+    Assertions.assertTrue(configs.containsKey("name"));
+
+    String name = configs.get("name");
+    Assertions.assertTrue("sink1".equals(name) || "sink2".equals(name));
+    if ("sink1".equals(name)) {
+      Assertions.assertEquals("a", configs.get("a"));
+    } else if ("sink2".equals(name)) {
+      Assertions.assertEquals("b", configs.get("a"));
+    }
+
+    List<RunEvent> events = sink.tryGetEvents();
+    Assertions.assertEquals(1, events.size());
+  }
+
+  private RunEvent getEvent() {
+    return Mockito.mock(RunEvent.class);
+  }
+
+  private Map<String, String> getLineageSinkConfig() {
+    Map<String, String> lineageSinkConfigs = new HashMap<>();
+    lineageSinkConfigs.put(
+        "sink1." + LineageConfig.LINEAGE_SINK_CLASS_NAME, 
LineageSinkForTest.class.getName());
+    lineageSinkConfigs.put("sink1.a", "a");
+    lineageSinkConfigs.put("sink1.name", "sink1");
+    lineageSinkConfigs.put(
+        "sink2." + LineageConfig.LINEAGE_SINK_CLASS_NAME, 
LineageSinkForTest.class.getName());
+    lineageSinkConfigs.put("sink2.a", "b");
+    lineageSinkConfigs.put("sink2.name", "sink2");
+    lineageSinkConfigs.put(LineageConfig.LINEAGE_SINK_QUEUE_CAPACITY, "1000");
+
+    return lineageSinkConfigs;
+  }
+}
diff --git a/server-common/build.gradle.kts b/server-common/build.gradle.kts
index 22d2eb29d7..ab4908ff31 100644
--- a/server-common/build.gradle.kts
+++ b/server-common/build.gradle.kts
@@ -39,6 +39,9 @@ dependencies {
   implementation(libs.bundles.metrics)
   implementation(libs.commons.lang3)
   implementation(libs.guava)
+  implementation(libs.jackson.datatype.jdk8)
+  implementation(libs.jackson.datatype.jsr310)
+  implementation(libs.jackson.databind)
   implementation(libs.prometheus.servlet)
 
   testImplementation(libs.commons.io)
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/ObjectMapperProvider.java
 
b/server-common/src/main/java/org/apache/gravitino/server/web/ObjectMapperProvider.java
similarity index 100%
rename from 
server/src/main/java/org/apache/gravitino/server/web/ObjectMapperProvider.java
rename to 
server-common/src/main/java/org/apache/gravitino/server/web/ObjectMapperProvider.java

Reply via email to