This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 88428f2bad [#6781] feat(lineage): support lineage sinks (#6891)
88428f2bad is described below
commit 88428f2bad1a1f67ced55b0c5551ac61d9f01117
Author: FANNG <[email protected]>
AuthorDate: Tue Apr 15 10:58:30 2025 +0800
[#6781] feat(lineage): support lineage sinks (#6891)
### What changes were proposed in this pull request?
Support lineage sinks, Linkage sinks manager will load all linkage sinks
and register them as event listeners.
### Why are the changes needed?
Fix: #6781
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
1. add UT
2. setup a local environment to test spark linage
---
.../gravitino/listener/AsyncQueueListener.java | 9 +-
.../org/apache/gravitino/listener/EventBus.java | 27 ++++-
.../gravitino/listener/EventListenerManager.java | 7 +-
.../listener/EventListenerPluginWrapper.java | 2 +-
.../gravitino/listener/api/event/EventWrapper.java | 53 +++++++++
lineage/build.gradle.kts | 4 +-
.../apache/gravitino/lineage/LineageConfig.java | 15 +++
.../apache/gravitino/lineage/LineageService.java | 2 +
.../gravitino/lineage/sink/LineageLogSink.java | 18 +--
.../lineage/sink/LineageSinkEventListener.java | 72 ++++++++++++
.../gravitino/lineage/sink/LineageSinkManager.java | 82 ++++++++++++-
.../gravitino/lineage/TestLineageConfig.java | 3 +
.../lineage/sink/LineageSinkForTest.java} | 35 ++++--
.../lineage/sink/TestLineageSinkManager.java | 128 +++++++++++++++++++++
server-common/build.gradle.kts | 3 +
.../gravitino/server/web/ObjectMapperProvider.java | 0
16 files changed, 415 insertions(+), 45 deletions(-)
diff --git
a/core/src/main/java/org/apache/gravitino/listener/AsyncQueueListener.java
b/core/src/main/java/org/apache/gravitino/listener/AsyncQueueListener.java
index 18043964dd..1bc24f445d 100644
--- a/core/src/main/java/org/apache/gravitino/listener/AsyncQueueListener.java
+++ b/core/src/main/java/org/apache/gravitino/listener/AsyncQueueListener.java
@@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory;
public class AsyncQueueListener implements EventListenerPlugin {
private static final Logger LOG =
LoggerFactory.getLogger(AsyncQueueListener.class);
private static final String NAME_PREFIX = "async-queue-listener-";
+ private static final float HIGH_WATERMARK_RATIO = 0.9f;
private final List<EventListenerPlugin> eventListeners;
private final BlockingQueue<BaseEvent> queue;
@@ -54,6 +55,7 @@ public class AsyncQueueListener implements
EventListenerPlugin {
private final AtomicLong lastDropEventCounters = new AtomicLong(0);
private Instant lastRecordDropEventTime;
private final String asyncQueueListenerName;
+ private final int highWatermarkThreshold;
public AsyncQueueListener(
List<EventListenerPlugin> listeners,
@@ -65,6 +67,7 @@ public class AsyncQueueListener implements
EventListenerPlugin {
this.queue = new LinkedBlockingQueue<>(queueCapacity);
this.asyncProcessor = new Thread(() -> processEvents());
this.dispatcherJoinSeconds = dispatcherJoinSeconds;
+ this.highWatermarkThreshold = (int) (queueCapacity * HIGH_WATERMARK_RATIO);
asyncProcessor.setDaemon(true);
asyncProcessor.setName(asyncQueueListenerName);
}
@@ -104,8 +107,12 @@ public class AsyncQueueListener implements
EventListenerPlugin {
eventListeners.forEach(listenerPlugin -> listenerPlugin.stop());
}
+ public boolean isHighWatermark() {
+ return queue.size() > highWatermarkThreshold;
+ }
+
@VisibleForTesting
- List<EventListenerPlugin> getEventListeners() {
+ public List<EventListenerPlugin> getEventListeners() {
return this.eventListeners;
}
diff --git a/core/src/main/java/org/apache/gravitino/listener/EventBus.java
b/core/src/main/java/org/apache/gravitino/listener/EventBus.java
index d851dc2927..24277c8462 100644
--- a/core/src/main/java/org/apache/gravitino/listener/EventBus.java
+++ b/core/src/main/java/org/apache/gravitino/listener/EventBus.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.listener;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
+import java.util.stream.Collectors;
import org.apache.gravitino.exceptions.ForbiddenException;
import org.apache.gravitino.listener.api.EventListenerPlugin;
import org.apache.gravitino.listener.api.event.BaseEvent;
@@ -33,12 +34,19 @@ import org.apache.gravitino.listener.api.event.PreEvent;
* within its internal management.
*/
public class EventBus {
- // Holds instances of EventListenerPlugin. These instances can either be
- // EventListenerPluginWrapper,
- // which are meant for synchronous event listening, or AsyncQueueListener,
designed for
- // asynchronous event processing.
+ /**
+ * Holds all instances of {@link EventListenerPlugin}. These instances can
either be {@link
+ * EventListenerPluginWrapper} which are used for synchronous event process,
or {@link
+ * AsyncQueueListener} for asynchronous event processing.
+ */
private final List<EventListenerPlugin> eventListeners;
+ /**
+ * Holds instances of {@link AsyncQueueListener}, mainly used to check the
status of async queue,
+ * like {@link #isHighWatermark()}.
+ */
+ private final List<AsyncQueueListener> asyncQueueListeners;
+
/**
* Constructs an EventBus with a predefined list of event listeners.
*
@@ -47,6 +55,11 @@ public class EventBus {
*/
public EventBus(List<EventListenerPlugin> eventListeners) {
this.eventListeners = eventListeners;
+ this.asyncQueueListeners =
+ eventListeners.stream()
+ .filter(AsyncQueueListener.class::isInstance)
+ .map(AsyncQueueListener.class::cast)
+ .collect(Collectors.toList());
}
/**
@@ -65,6 +78,10 @@ public class EventBus {
}
}
+ public boolean isHighWatermark() {
+ return
asyncQueueListeners.stream().anyMatch(AsyncQueueListener::isHighWatermark);
+ }
+
/**
* Retrieves the list of registered post-event listeners. This method is
primarily intended for
* testing purposes to verify the correct registration and functioning of
event listeners.
@@ -73,7 +90,7 @@ public class EventBus {
* EventBus.
*/
@VisibleForTesting
- List<EventListenerPlugin> getEventListeners() {
+ public List<EventListenerPlugin> getEventListeners() {
return eventListeners;
}
diff --git
a/core/src/main/java/org/apache/gravitino/listener/EventListenerManager.java
b/core/src/main/java/org/apache/gravitino/listener/EventListenerManager.java
index e018f8c13c..224d4e2d16 100644
--- a/core/src/main/java/org/apache/gravitino/listener/EventListenerManager.java
+++ b/core/src/main/java/org/apache/gravitino/listener/EventListenerManager.java
@@ -19,7 +19,6 @@
package org.apache.gravitino.listener;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
@@ -51,9 +50,9 @@ import org.slf4j.LoggerFactory;
public class EventListenerManager {
private static final Logger LOG =
LoggerFactory.getLogger(EventListenerManager.class);
public static final String GRAVITINO_EVENT_LISTENER_PREFIX =
"gravitino.eventListener.";
- static final String GRAVITINO_EVENT_LISTENER_NAMES = "names";
- @VisibleForTesting static final String GRAVITINO_EVENT_LISTENER_CLASS =
"class";
- static final String GRAVITINO_EVENT_LISTENER_QUEUE_CAPACITY =
"queueCapacity";
+ public static final String GRAVITINO_EVENT_LISTENER_NAMES = "names";
+ public static final String GRAVITINO_EVENT_LISTENER_CLASS = "class";
+ public static final String GRAVITINO_EVENT_LISTENER_QUEUE_CAPACITY =
"queueCapacity";
static final String GRAVITINO_EVENT_LISTENER_DISPATCHER_JOIN_SECONDS =
"dispatcherJoinSeconds";
private static final Splitter splitter = Splitter.on(",");
private static final Joiner DOT = Joiner.on(".");
diff --git
a/core/src/main/java/org/apache/gravitino/listener/EventListenerPluginWrapper.java
b/core/src/main/java/org/apache/gravitino/listener/EventListenerPluginWrapper.java
index 8e0a2ffbc4..a170d727bb 100644
---
a/core/src/main/java/org/apache/gravitino/listener/EventListenerPluginWrapper.java
+++
b/core/src/main/java/org/apache/gravitino/listener/EventListenerPluginWrapper.java
@@ -95,7 +95,7 @@ public class EventListenerPluginWrapper implements
EventListenerPlugin {
}
@VisibleForTesting
- EventListenerPlugin getUserEventListener() {
+ public EventListenerPlugin getUserEventListener() {
return userEventListener;
}
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/event/EventWrapper.java
b/core/src/main/java/org/apache/gravitino/listener/api/event/EventWrapper.java
new file mode 100644
index 0000000000..67a6bb6076
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/listener/api/event/EventWrapper.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event;
+
+import org.apache.gravitino.NameIdentifier;
+
+/**
+ * A generic wrapper class that extends {@link Event}. It serves to
encapsulate an object of type
+ * {@code T}. Unlike events generated by Gravitino operations such as table
creation, the {@link
+ * EventWrapper} is not directly produced by these actions. Instead, it is
designed to dispatch
+ * events for internal modules, specifically for components like lineage sinks
within the system.
+ *
+ * @param <T> The type of the object to be wrapped
+ */
+public class EventWrapper<T> extends Event {
+ private final T object;
+
+ /**
+ * Constructs an {@code EventWrapper} with the specified object.
+ *
+ * @param object the object to be wrapped
+ */
+ public EventWrapper(T object) {
+ super("mockUser", NameIdentifier.of("mock"));
+ this.object = object;
+ }
+
+ /**
+ * Retrieves the wrapped generic object.
+ *
+ * @return the wrapped object of type {@code T}
+ */
+ public T getObject() {
+ return object;
+ }
+}
diff --git a/lineage/build.gradle.kts b/lineage/build.gradle.kts
index dd462e3bf3..3706d42c70 100644
--- a/lineage/build.gradle.kts
+++ b/lineage/build.gradle.kts
@@ -28,9 +28,6 @@ dependencies {
implementation(libs.bundles.jersey)
implementation(libs.commons.lang3)
implementation(libs.guava)
- implementation(libs.jackson.datatype.jdk8)
- implementation(libs.jackson.datatype.jsr310)
- implementation(libs.jackson.databind)
implementation(libs.metrics.jersey2)
implementation(libs.openlineage.java) {
isTransitive = false
@@ -42,6 +39,7 @@ dependencies {
testAnnotationProcessor(libs.lombok)
testCompileOnly(libs.lombok)
+ testImplementation(libs.awaitility)
testImplementation(libs.jersey.test.framework.core) {
exclude(group = "org.junit.jupiter")
}
diff --git
a/lineage/src/main/java/org/apache/gravitino/lineage/LineageConfig.java
b/lineage/src/main/java/org/apache/gravitino/lineage/LineageConfig.java
index ac7185336b..e27c3d98da 100644
--- a/lineage/src/main/java/org/apache/gravitino/lineage/LineageConfig.java
+++ b/lineage/src/main/java/org/apache/gravitino/lineage/LineageConfig.java
@@ -19,6 +19,7 @@
package org.apache.gravitino.lineage;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import java.util.HashMap;
@@ -38,6 +39,7 @@ public class LineageConfig extends Config {
public static final String LINEAGE_CONFIG_PREFIX = "gravitino.lineage.";
public static final String LINEAGE_CONFIG_SINKS = "sinks";
+ public static final String LINEAGE_SINK_QUEUE_CAPACITY = "sinkQueueCapacity";
public static final String LINEAGE_CONFIG_SOURCE = "source";
public static final String LINEAGE_SOURCE_CLASS_NAME = "sourceClass";
public static final String LINEAGE_PROCESSOR_CLASS_NAME = "processorClass";
@@ -47,6 +49,8 @@ public class LineageConfig extends Config {
public static final String LINEAGE_LOG_SINK_NAME = "log";
public static final String LINEAGE_HTTP_SOURCE_NAME = "http";
+ @VisibleForTesting static final int LINEAGE_SINK_QUEUE_CAPACITY_DEFAULT =
10000;
+
private static final Splitter splitter = Splitter.on(",");
public static final ConfigEntry<String> SOURCE_NAME =
@@ -70,6 +74,13 @@ public class LineageConfig extends Config {
.stringConf()
.createWithDefault(LINEAGE_LOG_SINK_NAME);
+ public static final ConfigEntry<Integer> SINK_QUEUE_CAPACITY =
+ new ConfigBuilder(LINEAGE_SINK_QUEUE_CAPACITY)
+ .doc("The capacity of the total lineage sink queue")
+ .version(ConfigConstants.VERSION_0_9_0)
+ .intConf()
+ .createWithDefault(LINEAGE_SINK_QUEUE_CAPACITY_DEFAULT);
+
public LineageConfig(Map<String, String> properties) {
super(false);
loadFromMap(properties, k -> true);
@@ -104,6 +115,10 @@ public class LineageConfig extends Config {
m.put(LINEAGE_CONFIG_SINKS, sinkString);
}
+ if (!m.containsKey(LINEAGE_SINK_QUEUE_CAPACITY)) {
+ m.put(LINEAGE_SINK_QUEUE_CAPACITY,
String.valueOf(get(SINK_QUEUE_CAPACITY)));
+ }
+
String logClassConfigKey =
LineageConfig.LINEAGE_LOG_SINK_NAME + "." +
LineageConfig.LINEAGE_SINK_CLASS_NAME;
if (sinks.contains(LINEAGE_LOG_SINK_NAME) &&
!config.containsKey(logClassConfigKey)) {
diff --git
a/lineage/src/main/java/org/apache/gravitino/lineage/LineageService.java
b/lineage/src/main/java/org/apache/gravitino/lineage/LineageService.java
index 2667d3e5ca..2c74456d06 100644
--- a/lineage/src/main/java/org/apache/gravitino/lineage/LineageService.java
+++ b/lineage/src/main/java/org/apache/gravitino/lineage/LineageService.java
@@ -55,9 +55,11 @@ public class LineageService implements LineageDispatcher,
SupportsRESTPackages {
public void close() {
if (source != null) {
source.close();
+ source = null;
}
if (sinkManager != null) {
sinkManager.close();
+ sinkManager = null;
}
}
diff --git
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageLogSink.java
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageLogSink.java
index cdff7822f7..2ea127bc62 100644
---
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageLogSink.java
+++
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageLogSink.java
@@ -19,31 +19,17 @@
package org.apache.gravitino.lineage.sink;
-import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.databind.cfg.EnumFeature;
-import com.fasterxml.jackson.databind.json.JsonMapper;
-import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.openlineage.server.OpenLineage.RunEvent;
import org.apache.gravitino.lineage.Utils;
+import org.apache.gravitino.server.web.ObjectMapperProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LineageLogSink implements LineageSink {
private static final Logger LOG =
LoggerFactory.getLogger(LineageLogSink.class);
- private ObjectMapper objectMapper =
- JsonMapper.builder()
- .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
- .configure(EnumFeature.WRITE_ENUMS_TO_LOWERCASE, true)
- .enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS)
- .build()
- .setSerializationInclusion(JsonInclude.Include.NON_NULL)
- .registerModule(new JavaTimeModule())
- .registerModule(new Jdk8Module());
+ private ObjectMapper objectMapper = ObjectMapperProvider.objectMapper();
private LineageLogger logger = new LineageLogger();
private static class LineageLogger {
diff --git
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkEventListener.java
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkEventListener.java
new file mode 100644
index 0000000000..2ed4625ecc
--- /dev/null
+++
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkEventListener.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.lineage.sink;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import io.openlineage.server.OpenLineage.RunEvent;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.lineage.LineageConfig;
+import org.apache.gravitino.listener.api.EventListenerPlugin;
+import org.apache.gravitino.listener.api.event.Event;
+import org.apache.gravitino.listener.api.event.EventWrapper;
+import org.apache.gravitino.utils.ClassUtils;
+
+public class LineageSinkEventListener implements EventListenerPlugin {
+
+ private LineageSink lineageSink;
+
+ @Override
+ public void init(Map<String, String> properties) throws RuntimeException {
+ String sinkClassName =
properties.get(LineageConfig.LINEAGE_SINK_CLASS_NAME);
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(sinkClassName),
+ LineageConfig.LINEAGE_SINK_CLASS_NAME + " is not set.");
+ this.lineageSink = ClassUtils.loadClass(sinkClassName);
+ lineageSink.initialize(properties);
+ }
+
+ @Override
+ public void start() throws RuntimeException {}
+
+ @Override
+ public void stop() throws RuntimeException {
+ if (lineageSink != null) {
+ lineageSink.close();
+ }
+ }
+
+ @Override
+ public Mode mode() {
+ return Mode.ASYNC_ISOLATED;
+ }
+
+ @Override
+ public void onPostEvent(Event postEvent) throws RuntimeException {
+ EventWrapper<RunEvent> wrapper = (EventWrapper<RunEvent>) postEvent;
+ lineageSink.sink(wrapper.getObject());
+ }
+
+ @VisibleForTesting
+ LineageSink lineageSink() {
+ return lineageSink;
+ }
+}
diff --git
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
index f01dcc80e7..19d07c4a93 100644
---
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
+++
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
@@ -19,23 +19,95 @@
package org.apache.gravitino.lineage.sink;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import io.openlineage.server.OpenLineage.RunEvent;
import java.io.Closeable;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.lineage.LineageConfig;
+import org.apache.gravitino.listener.EventBus;
+import org.apache.gravitino.listener.EventListenerManager;
+import org.apache.gravitino.listener.api.event.EventWrapper;
-@SuppressWarnings("unused")
public class LineageSinkManager implements Closeable {
+ private EventBus eventBus;
+ private final EventListenerManager eventListenerManager;
- public void initialize(List<String> sinks, Map<String, String>
LineageConfigs) {}
+ public LineageSinkManager() {
+ this.eventListenerManager = new EventListenerManager();
+ }
+
+ public void initialize(List<String> sinks, Map<String, String>
LineageConfigs) {
+ Map<String, String> eventListenerConfigs =
+ transformToEventListenerConfigs(sinks, LineageConfigs);
+ eventListenerManager.init(eventListenerConfigs);
+ this.eventBus = eventListenerManager.createEventBus();
+ eventListenerManager.start();
+ }
// Checks if the sink queue size surpasses the threshold to avoid
overwhelming lineage sinks.
public boolean isHighWatermark() {
- return false;
+ return eventBus.isHighWatermark();
}
- public void sink(RunEvent runEvent) {}
+ /**
+ * The lineage event is dispatched by event listener system with a dedicated
async event listener
+ * {@link LineageSinkEventListener} which wrap the lineage sink class.
Consequently, we must
+ * convert the lineage configuration into event listener configurations.
This conversion includes
+ * details such as event listener names, event listener classes, and the
capacity of the
+ * asynchronous event queue.
+ */
+ @VisibleForTesting
+ static Map<String, String> transformToEventListenerConfigs(
+ List<String> sinks, Map<String, String> lineageConfigs) {
+ Map<String, String> eventListenerConfigs = new HashMap<>();
+ eventListenerConfigs.putAll(generateEventListenerConfigs(sinks,
lineageConfigs));
+ eventListenerConfigs.putAll(lineageConfigs);
+ return eventListenerConfigs;
+ }
+
+ private static Map<String, String> generateEventListenerConfigs(
+ List<String> sinks, Map<String, String> lineageConfigs) {
+ HashMap eventListenerConfigs = new HashMap();
+ if (sinks.isEmpty()) {
+ return eventListenerConfigs;
+ }
+
+ String queueCapacity =
lineageConfigs.get(LineageConfig.LINEAGE_SINK_QUEUE_CAPACITY);
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(queueCapacity), "Lineage sink queue capacity is
not set");
+ int capacityPerSink = Integer.valueOf(queueCapacity) / sinks.size();
+
+ eventListenerConfigs.put(
+ EventListenerManager.GRAVITINO_EVENT_LISTENER_NAMES, String.join(",",
sinks));
+ sinks.forEach(
+ sinkName -> {
+ eventListenerConfigs.put(
+ sinkName + "." +
EventListenerManager.GRAVITINO_EVENT_LISTENER_CLASS,
+ LineageSinkEventListener.class.getName());
+ eventListenerConfigs.put(
+ sinkName + "." +
EventListenerManager.GRAVITINO_EVENT_LISTENER_QUEUE_CAPACITY,
+ String.valueOf(capacityPerSink));
+ });
+ return eventListenerConfigs;
+ }
+
+ public void sink(RunEvent runEvent) {
+ eventBus.dispatchEvent(new EventWrapper(runEvent));
+ }
@Override
- public void close() {}
+ public void close() {
+ if (eventListenerManager != null) {
+ eventListenerManager.stop();
+ }
+ }
+
+ @VisibleForTesting
+ EventBus eventBus() {
+ return eventBus;
+ }
}
diff --git
a/lineage/src/test/java/org/apache/gravitino/lineage/TestLineageConfig.java
b/lineage/src/test/java/org/apache/gravitino/lineage/TestLineageConfig.java
index d4dbfa8a3a..26f52733b2 100644
--- a/lineage/src/test/java/org/apache/gravitino/lineage/TestLineageConfig.java
+++ b/lineage/src/test/java/org/apache/gravitino/lineage/TestLineageConfig.java
@@ -64,6 +64,9 @@ public class TestLineageConfig {
sinkConfigs.get(
LineageConfig.LINEAGE_LOG_SINK_NAME + "." +
LineageConfig.LINEAGE_SINK_CLASS_NAME);
Assertions.assertEquals(LineageLogSink.class.getName(), className);
+ String capacity =
sinkConfigs.get(LineageConfig.LINEAGE_SINK_QUEUE_CAPACITY);
+ Assertions.assertEquals(
+ LineageConfig.LINEAGE_SINK_QUEUE_CAPACITY_DEFAULT,
Integer.parseInt(capacity));
// config multi sinks
Map<String, String> config2 =
diff --git
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
b/lineage/src/test/java/org/apache/gravitino/lineage/sink/LineageSinkForTest.java
similarity index 57%
copy from
lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
copy to
lineage/src/test/java/org/apache/gravitino/lineage/sink/LineageSinkForTest.java
index f01dcc80e7..1f62ea9909 100644
---
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
+++
b/lineage/src/test/java/org/apache/gravitino/lineage/sink/LineageSinkForTest.java
@@ -20,22 +20,37 @@
package org.apache.gravitino.lineage.sink;
import io.openlineage.server.OpenLineage.RunEvent;
-import java.io.Closeable;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import org.awaitility.Awaitility;
-@SuppressWarnings("unused")
-public class LineageSinkManager implements Closeable {
+@Getter
+public class LineageSinkForTest implements LineageSink {
+ private Map<String, String> configs;
+ private List<RunEvent> runEvents = new LinkedList<>();
- public void initialize(List<String> sinks, Map<String, String>
LineageConfigs) {}
+ @Override
+ public void initialize(Map<String, String> configs) {
+ this.configs = configs;
+ }
- // Checks if the sink queue size surpasses the threshold to avoid
overwhelming lineage sinks.
- public boolean isHighWatermark() {
- return false;
+ @Override
+ public void sink(RunEvent event) {
+ this.runEvents.add(event);
}
- public void sink(RunEvent runEvent) {}
+ public List<RunEvent> tryGetEvents() {
+ Awaitility.await()
+ .atMost(20, TimeUnit.SECONDS)
+ .pollInterval(10, TimeUnit.MILLISECONDS)
+ .until(() -> getEvents().size() > 0);
+ return getEvents();
+ }
- @Override
- public void close() {}
+ private List<RunEvent> getEvents() {
+ return runEvents;
+ }
}
diff --git
a/lineage/src/test/java/org/apache/gravitino/lineage/sink/TestLineageSinkManager.java
b/lineage/src/test/java/org/apache/gravitino/lineage/sink/TestLineageSinkManager.java
new file mode 100644
index 0000000000..6ccf6ac5d2
--- /dev/null
+++
b/lineage/src/test/java/org/apache/gravitino/lineage/sink/TestLineageSinkManager.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.lineage.sink;
+
+import io.openlineage.server.OpenLineage.RunEvent;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.SneakyThrows;
+import org.apache.gravitino.lineage.LineageConfig;
+import org.apache.gravitino.listener.AsyncQueueListener;
+import org.apache.gravitino.listener.EventBus;
+import org.apache.gravitino.listener.EventListenerManager;
+import org.apache.gravitino.listener.EventListenerPluginWrapper;
+import org.apache.gravitino.listener.api.EventListenerPlugin;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class TestLineageSinkManager {
+
+ @SneakyThrows
+ @Test
+ void testLineageSinkManager() {
+ LineageSinkManager lineageSinkManager = new LineageSinkManager();
+ lineageSinkManager.initialize(Arrays.asList("sink1", "sink2"),
getLineageSinkConfig());
+ lineageSinkManager.sink(getEvent());
+ EventBus eventBus = lineageSinkManager.eventBus();
+ List<EventListenerPlugin> listeners = eventBus.getEventListeners();
+ Assertions.assertEquals(2, listeners.size());
+
+ listeners.stream()
+ .forEach(
+ listener -> {
+ Assertions.assertTrue(listener instanceof AsyncQueueListener);
+ AsyncQueueListener asyncQueueListener = (AsyncQueueListener)
listener;
+ List<EventListenerPlugin> internalListeners =
asyncQueueListener.getEventListeners();
+ Assertions.assertEquals(1, internalListeners.size());
+ Assertions.assertTrue(internalListeners.get(0) instanceof
EventListenerPluginWrapper);
+ EventListenerPlugin userListener =
+ ((EventListenerPluginWrapper)
internalListeners.get(0)).getUserEventListener();
+ Assertions.assertTrue(userListener instanceof
LineageSinkEventListener);
+ LineageSink sink = ((LineageSinkEventListener)
userListener).lineageSink();
+ Assertions.assertTrue(sink instanceof LineageSinkForTest);
+ checkLineageSink((LineageSinkForTest) sink);
+ });
+ }
+
+ @Test
+ void testTransformToEventListenerConfigs() {
+ Map<String, String> configs =
+ LineageSinkManager.transformToEventListenerConfigs(
+ Arrays.asList("sink1", "sink2"), getLineageSinkConfig());
+
+ Assertions.assertEquals(
+ "sink1,sink2",
configs.get(EventListenerManager.GRAVITINO_EVENT_LISTENER_NAMES));
+
+ Assertions.assertEquals(
+ "500",
+ configs.get("sink1." +
EventListenerManager.GRAVITINO_EVENT_LISTENER_QUEUE_CAPACITY));
+ Assertions.assertEquals(
+ LineageSinkEventListener.class.getName(),
+ configs.get("sink1." +
EventListenerManager.GRAVITINO_EVENT_LISTENER_CLASS));
+ Assertions.assertEquals("sink1", configs.get("sink1.name"));
+ Assertions.assertEquals("a", configs.get("sink1.a"));
+
+ Assertions.assertEquals(
+ "500",
+ configs.get("sink2." +
EventListenerManager.GRAVITINO_EVENT_LISTENER_QUEUE_CAPACITY));
+ Assertions.assertEquals(
+ LineageSinkEventListener.class.getName(),
+ configs.get("sink2." +
EventListenerManager.GRAVITINO_EVENT_LISTENER_CLASS));
+ Assertions.assertEquals("b", configs.get("sink2.a"));
+ }
+
+ private void checkLineageSink(LineageSinkForTest sink) {
+ Map<String, String> configs = sink.getConfigs();
+ Assertions.assertTrue(configs.containsKey("name"));
+
+ String name = configs.get("name");
+ Assertions.assertTrue("sink1".equals(name) || "sink2".equals(name));
+ if ("sink1".equals(name)) {
+ Assertions.assertEquals("a", configs.get("a"));
+ } else if ("sink2".equals(name)) {
+ Assertions.assertEquals("b", configs.get("a"));
+ }
+
+ List<RunEvent> events = sink.tryGetEvents();
+ Assertions.assertEquals(1, events.size());
+ }
+
+ private RunEvent getEvent() {
+ return Mockito.mock(RunEvent.class);
+ }
+
+ private Map<String, String> getLineageSinkConfig() {
+ Map<String, String> lineageSinkConfigs = new HashMap<>();
+ lineageSinkConfigs.put(
+ "sink1." + LineageConfig.LINEAGE_SINK_CLASS_NAME,
LineageSinkForTest.class.getName());
+ lineageSinkConfigs.put("sink1.a", "a");
+ lineageSinkConfigs.put("sink1.name", "sink1");
+ lineageSinkConfigs.put(
+ "sink2." + LineageConfig.LINEAGE_SINK_CLASS_NAME,
LineageSinkForTest.class.getName());
+ lineageSinkConfigs.put("sink2.a", "b");
+ lineageSinkConfigs.put("sink2.name", "sink2");
+ lineageSinkConfigs.put(LineageConfig.LINEAGE_SINK_QUEUE_CAPACITY, "1000");
+
+ return lineageSinkConfigs;
+ }
+}
diff --git a/server-common/build.gradle.kts b/server-common/build.gradle.kts
index 22d2eb29d7..ab4908ff31 100644
--- a/server-common/build.gradle.kts
+++ b/server-common/build.gradle.kts
@@ -39,6 +39,9 @@ dependencies {
implementation(libs.bundles.metrics)
implementation(libs.commons.lang3)
implementation(libs.guava)
+ implementation(libs.jackson.datatype.jdk8)
+ implementation(libs.jackson.datatype.jsr310)
+ implementation(libs.jackson.databind)
implementation(libs.prometheus.servlet)
testImplementation(libs.commons.io)
diff --git
a/server/src/main/java/org/apache/gravitino/server/web/ObjectMapperProvider.java
b/server-common/src/main/java/org/apache/gravitino/server/web/ObjectMapperProvider.java
similarity index 100%
rename from
server/src/main/java/org/apache/gravitino/server/web/ObjectMapperProvider.java
rename to
server-common/src/main/java/org/apache/gravitino/server/web/ObjectMapperProvider.java