This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 8bcfba0b4e [#6779] feat(core): Support lineage framework in Gravitino
(#6782)
8bcfba0b4e is described below
commit 8bcfba0b4ef0b433eee9d1ec8e4891e5c55c69b3
Author: FANNG <[email protected]>
AuthorDate: Tue Apr 8 12:18:50 2025 +0800
[#6779] feat(core): Support lineage framework in Gravitino (#6782)
### What changes were proposed in this pull request?
Support lineage framework in Gravitino, lineage endpoint and lineage
sink manager will be proposed in separate PR.
Total workflow draft PR: https://github.com/apache/gravitino/pull/6723
The main work flow:
1. Gravitino server creates lineage service which manages lineage source
and lineage sinks.
2. lineage source implementation receives lineage run event and
dispatches to lineage service.
3. lineage service process the run event and dispatch to lineage sink
manager.
4. lineage sink manager manges the life cycle of link lineage sinks,
will dispatch run event to lineage sinks.
### Why are the changes needed?
Fix: #6779
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
setup Spark&Marquez environment and test the work flow.
---
LICENSE.bin | 1 +
conf/log4j2.properties.template | 24 ++++
.../org/apache/gravitino/utils/ClassUtils.java | 30 +++++
gradle/libs.versions.toml | 2 +
lineage/build.gradle.kts | 41 +++++++
.../apache/gravitino/lineage/LineageConfig.java | 132 +++++++++++++++++++++
.../gravitino/lineage/LineageDispatcher.java | 57 +++++++++
.../apache/gravitino/lineage/LineageService.java | 82 +++++++++++++
.../lineage/processor/LineageProcessor.java | 34 ++++++
.../gravitino/lineage/processor/NoopProcessor.java | 30 +++++
.../gravitino/lineage/sink/LineageLogSink.java | 73 ++++++++++++
.../apache/gravitino/lineage/sink/LineageSink.java | 46 +++++++
.../gravitino/lineage/sink/LineageSinkManager.java | 41 +++++++
.../lineage/source/HTTPLineageSource.java | 36 ++++++
.../gravitino/lineage/source/LineageSource.java | 43 +++++++
.../gravitino/lineage/TestLineageConfig.java | 101 ++++++++++++++++
.../gravitino/server/web/SupportsRESTPackages.java | 36 ++++++
server/build.gradle.kts | 1 +
.../apache/gravitino/server/GravitinoServer.java | 28 +++--
settings.gradle.kts | 1 +
20 files changed, 832 insertions(+), 7 deletions(-)
diff --git a/LICENSE.bin b/LICENSE.bin
index d1dddd5279..58f7d0793d 100644
--- a/LICENSE.bin
+++ b/LICENSE.bin
@@ -258,6 +258,7 @@
Airlift
The Netty Project
Open Telemetry
+ Open Lineage
Trino
Jakarta Dependency Injection
Jakarta Bean Validation
diff --git a/conf/log4j2.properties.template b/conf/log4j2.properties.template
index bd26f33e37..f1eb30fcd9 100644
--- a/conf/log4j2.properties.template
+++ b/conf/log4j2.properties.template
@@ -47,6 +47,30 @@ appender.rolling.strategy.delete.ifLastModified.type =
IfLastModified
# Delete all files older than 30 days
appender.rolling.strategy.delete.ifLastModified.age = 30d
+
+## use seperate file for lineage log
+appender.lineage_file.type=RollingFile
+appender.lineage_file.name=lineage_file
+appender.lineage_file.fileName=${basePath}/gravitino_lineage.log
+appender.lineage_file.filePattern=${basePath}/gravitino_lineage_%d{yyyyMMdd}.log.gz
+appender.lineage_file.layout.type=PatternLayout
+appender.lineage_file.layout.pattern=[%d{yyyy-MM-dd HH:mm:ss}] %m%n
+appender.lineage_file.policies.type = Policies
+
+appender.lineage_file.policies.time.type = TimeBasedTriggeringPolicy
+appender.lineage_file.policies.time.interval = 1
+appender.lineage_file.policies.time.modulate = true
+appender.lineage_file.strategy.type = DefaultRolloverStrategy
+appender.lineage_file.strategy.delete.type = Delete
+appender.lineage_file.strategy.delete.basePath = ${basePath}
+appender.lineage_file.strategy.delete.maxDepth = 10
+appender.lineage_file.strategy.delete.ifLastModified.type = IfLastModified
+
+logger.lineage.name = org.apache.gravitino.lineage.LineageLogSink$LineageLogger
+logger.lineage.level = info
+logger.lineage.appenderRef.lineage_file.ref = lineage_file
+logger.lineage.additivity = false
+
# Configure root logger
rootLogger.level = info
rootLogger.appenderRef.rolling.ref = fileLogger
diff --git a/core/src/main/java/org/apache/gravitino/utils/ClassUtils.java
b/core/src/main/java/org/apache/gravitino/utils/ClassUtils.java
new file mode 100644
index 0000000000..7307394eb8
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/utils/ClassUtils.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.utils;
+
+public class ClassUtils {
+ public static <T> T loadClass(String className) {
+ try {
+ return (T)
Class.forName(className).getDeclaredConstructor().newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 530d502653..a157bb13d0 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -118,6 +118,7 @@ datanucleus-jdo = "3.2.0-m3"
hudi = "0.15.0"
google-auth = "1.28.0"
aliyun-credentials = "0.3.12"
+openlineage = "1.29.0"
[libraries]
aws-iam = { group = "software.amazon.awssdk", name = "iam", version.ref =
"awssdk" }
@@ -279,6 +280,7 @@ google-auth-credentials = { group = "com.google.auth", name
= "google-auth-libra
aliyun-credentials-sdk = { group='com.aliyun', name='credentials-java',
version.ref='aliyun-credentials' }
flinkjdbc = {group='org.apache.flink',name='flink-connector-jdbc',
version.ref='flinkjdbc'}
+openlineage-java= { group = "io.openlineage", name = "openlineage-java",
version.ref = "openlineage" }
[bundles]
log4j = ["slf4j-api", "log4j-slf4j2-impl", "log4j-api", "log4j-core",
"log4j-12-api", "log4j-layout-template-json"]
diff --git a/lineage/build.gradle.kts b/lineage/build.gradle.kts
new file mode 100644
index 0000000000..21da55dea3
--- /dev/null
+++ b/lineage/build.gradle.kts
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+ id("java")
+}
+
+dependencies {
+ implementation(project(":core"))
+ implementation(project(":server-common"))
+ implementation(libs.commons.lang3)
+ implementation(libs.guava)
+ implementation(libs.slf4j.api)
+ implementation(libs.jackson.datatype.jdk8)
+ implementation(libs.jackson.datatype.jsr310)
+ implementation(libs.jackson.databind)
+ implementation(libs.openlineage.java) {
+ isTransitive = false
+ }
+
+ testImplementation(libs.junit.jupiter.api)
+ testImplementation(libs.junit.jupiter.params)
+
+ testRuntimeOnly(libs.junit.jupiter.engine)
+}
diff --git
a/lineage/src/main/java/org/apache/gravitino/lineage/LineageConfig.java
b/lineage/src/main/java/org/apache/gravitino/lineage/LineageConfig.java
new file mode 100644
index 0000000000..ac7185336b
--- /dev/null
+++ b/lineage/src/main/java/org/apache/gravitino/lineage/LineageConfig.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.lineage;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.config.ConfigBuilder;
+import org.apache.gravitino.config.ConfigConstants;
+import org.apache.gravitino.config.ConfigEntry;
+import org.apache.gravitino.lineage.processor.NoopProcessor;
+import org.apache.gravitino.lineage.sink.LineageLogSink;
+import org.apache.gravitino.lineage.source.HTTPLineageSource;
+
+public class LineageConfig extends Config {
+
+ public static final String LINEAGE_CONFIG_PREFIX = "gravitino.lineage.";
+ public static final String LINEAGE_CONFIG_SINKS = "sinks";
+ public static final String LINEAGE_CONFIG_SOURCE = "source";
+ public static final String LINEAGE_SOURCE_CLASS_NAME = "sourceClass";
+ public static final String LINEAGE_PROCESSOR_CLASS_NAME = "processorClass";
+ public static final String LINEAGE_SINK_CLASS_NAME = "sinkClass";
+ public static final String LINEAGE_HTTP_SOURCE_CLASS_NAME =
HTTPLineageSource.class.getName();
+
+ public static final String LINEAGE_LOG_SINK_NAME = "log";
+ public static final String LINEAGE_HTTP_SOURCE_NAME = "http";
+
+ private static final Splitter splitter = Splitter.on(",");
+
+ public static final ConfigEntry<String> SOURCE_NAME =
+ new ConfigBuilder(LINEAGE_CONFIG_SOURCE)
+ .doc("The name of lineage event source")
+ .version(ConfigConstants.VERSION_0_9_0)
+ .stringConf()
+ .createWithDefault(LINEAGE_HTTP_SOURCE_NAME);
+
+ public static final ConfigEntry<String> PROCESSOR_CLASS =
+ new ConfigBuilder(LINEAGE_PROCESSOR_CLASS_NAME)
+ .doc("The class name of lineage event processor")
+ .version(ConfigConstants.VERSION_0_9_0)
+ .stringConf()
+ .createWithDefault(NoopProcessor.class.getName());
+
+ public static final ConfigEntry<String> SINKS =
+ new ConfigBuilder(LINEAGE_CONFIG_SINKS)
+ .doc("The sinks of lineage event")
+ .version(ConfigConstants.VERSION_0_9_0)
+ .stringConf()
+ .createWithDefault(LINEAGE_LOG_SINK_NAME);
+
+ public LineageConfig(Map<String, String> properties) {
+ super(false);
+ loadFromMap(properties, k -> true);
+ }
+
+ public String source() {
+ return get(SOURCE_NAME);
+ }
+
+ public String sourceClass() {
+ if (source().equals(LINEAGE_HTTP_SOURCE_NAME)) {
+ return LINEAGE_HTTP_SOURCE_CLASS_NAME;
+ }
+ String sourceConfig = source() + "." + LINEAGE_SOURCE_CLASS_NAME;
+ String sourceClass = getRawString(sourceConfig);
+ Preconditions.checkArgument(StringUtils.isNotBlank(sourceClass),
sourceConfig + " is not set");
+ return sourceClass;
+ }
+
+ public String processorClass() {
+ return get(PROCESSOR_CLASS);
+ }
+
+ public Map<String, String> getSinkConfigs() {
+ List<String> sinks = sinks();
+
+ Map<String, String> config = getAllConfig();
+ Map<String, String> m = new HashMap(config);
+
+ String sinkString = get(SINKS);
+ if (!m.containsKey(LINEAGE_CONFIG_SINKS)) {
+ m.put(LINEAGE_CONFIG_SINKS, sinkString);
+ }
+
+ String logClassConfigKey =
+ LineageConfig.LINEAGE_LOG_SINK_NAME + "." +
LineageConfig.LINEAGE_SINK_CLASS_NAME;
+ if (sinks.contains(LINEAGE_LOG_SINK_NAME) &&
!config.containsKey(logClassConfigKey)) {
+ m.put(logClassConfigKey, LineageLogSink.class.getName());
+ }
+
+ sinks.stream()
+ .forEach(
+ sinkName -> {
+ String sinkClassConfig = sinkName + "." +
LineageConfig.LINEAGE_SINK_CLASS_NAME;
+ Preconditions.checkArgument(
+ m.containsKey(sinkClassConfig), sinkClassConfig + " is not
set");
+ });
+
+ return m;
+ }
+
+ public List<String> sinks() {
+ String sinks = get(SINKS);
+ return splitter
+ .omitEmptyStrings()
+ .trimResults()
+ .splitToStream(sinks)
+ .collect(Collectors.toList());
+ }
+}
diff --git
a/lineage/src/main/java/org/apache/gravitino/lineage/LineageDispatcher.java
b/lineage/src/main/java/org/apache/gravitino/lineage/LineageDispatcher.java
new file mode 100644
index 0000000000..ffa7f92058
--- /dev/null
+++ b/lineage/src/main/java/org/apache/gravitino/lineage/LineageDispatcher.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.lineage;
+
+import io.openlineage.server.OpenLineage.RunEvent;
+import java.io.Closeable;
+
+/**
+ * Dispatches lineage events to configured sinks after processing.
Implementations should handle
+ * initialization, event processing, and resource cleanup through {@link
Closeable}.
+ *
+ * <p>Typical lifecycles:
+ *
+ * <ol>
+ * <li>{@link #initialize(LineageConfig)} with required configurations
+ * <li>Repeated calls to {@link #dispatchLineageEvent(RunEvent)}
+ * <li>{@link #close()} for resource cleanup
+ * </ol>
+ */
+public interface LineageDispatcher extends Closeable {
+
+ /**
+ * Initializes the dispatcher with configuration. Must be called before
event dispatching.
+ *
+ * @param lineageConfig configuration for lineage source, processor and
sinks.
+ */
+ void initialize(LineageConfig lineageConfig);
+
+ /**
+ * Dispatches a lineage run event to the configured sink after processing.
+ *
+ * <p>Callers should implement appropriate retry/logging mechanisms for
rejected events to prevent
+ * system overload.
+ *
+ * @param runEvent The OpenLineage run event to be processed and dispatched.
Must not be null.
+ * @return {@code true} if the event was successfully processed and
dispatched to the sinks,
+ * {@code false} if the event was rejected due to the overload of
lineage sinks.
+ */
+ boolean dispatchLineageEvent(RunEvent runEvent);
+}
diff --git
a/lineage/src/main/java/org/apache/gravitino/lineage/LineageService.java
b/lineage/src/main/java/org/apache/gravitino/lineage/LineageService.java
new file mode 100644
index 0000000000..2667d3e5ca
--- /dev/null
+++ b/lineage/src/main/java/org/apache/gravitino/lineage/LineageService.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.lineage;
+
+import com.google.common.collect.ImmutableSet;
+import io.openlineage.server.OpenLineage;
+import io.openlineage.server.OpenLineage.RunEvent;
+import java.util.Set;
+import org.apache.gravitino.lineage.processor.LineageProcessor;
+import org.apache.gravitino.lineage.sink.LineageSinkManager;
+import org.apache.gravitino.lineage.source.LineageSource;
+import org.apache.gravitino.server.web.SupportsRESTPackages;
+import org.apache.gravitino.utils.ClassUtils;
+
+/**
+ * The LineageService manages the life cycle of lineage sinks, sources, and
processors. It provides
+ * {@code dispatchLineageEvent} method for lineage source to dispatch lineage
events to the sinks.
+ */
+public class LineageService implements LineageDispatcher, SupportsRESTPackages
{
+ private LineageSinkManager sinkManager;
+ private LineageSource source;
+ private LineageProcessor processor;
+
+ public void initialize(LineageConfig lineageConfig) {
+ String sourceName = lineageConfig.source();
+ String sourceClass = lineageConfig.sourceClass();
+ this.source = ClassUtils.loadClass(sourceClass);
+ this.sinkManager = new LineageSinkManager();
+
+ String processorClassName = lineageConfig.processorClass();
+ this.processor = ClassUtils.loadClass(processorClassName);
+
+ sinkManager.initialize(lineageConfig.sinks(),
lineageConfig.getSinkConfigs());
+ source.initialize(lineageConfig.getConfigsWithPrefix(sourceName), this);
+ }
+
+ @Override
+ public void close() {
+ if (source != null) {
+ source.close();
+ }
+ if (sinkManager != null) {
+ sinkManager.close();
+ }
+ }
+
+ @Override
+ public boolean dispatchLineageEvent(OpenLineage.RunEvent runEvent) {
+ if (sinkManager.isHighWatermark()) {
+ return false;
+ }
+
+ RunEvent newEvent = processor.process(runEvent);
+ sinkManager.sink(newEvent);
+ return true;
+ }
+
+ @Override
+ public Set<String> getRESTPackages() {
+ if (source instanceof SupportsRESTPackages) {
+ return ((SupportsRESTPackages) source).getRESTPackages();
+ }
+ return ImmutableSet.of();
+ }
+}
diff --git
a/lineage/src/main/java/org/apache/gravitino/lineage/processor/LineageProcessor.java
b/lineage/src/main/java/org/apache/gravitino/lineage/processor/LineageProcessor.java
new file mode 100644
index 0000000000..1a08e0cc0d
--- /dev/null
+++
b/lineage/src/main/java/org/apache/gravitino/lineage/processor/LineageProcessor.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.lineage.processor;
+
+import io.openlineage.server.OpenLineage.RunEvent;
+
+/** Processes {@link RunEvent} objects to transform or enrich their lineage
data. */
+public interface LineageProcessor {
+
+ /**
+ * Processes a run event and returns the modified instance.
+ *
+ * @param runEvent The original run event to process.
+ * @return Processed run event instance with updated lineage data.
+ */
+ RunEvent process(RunEvent runEvent);
+}
diff --git
a/lineage/src/main/java/org/apache/gravitino/lineage/processor/NoopProcessor.java
b/lineage/src/main/java/org/apache/gravitino/lineage/processor/NoopProcessor.java
new file mode 100644
index 0000000000..cb09eaf705
--- /dev/null
+++
b/lineage/src/main/java/org/apache/gravitino/lineage/processor/NoopProcessor.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.lineage.processor;
+
+import io.openlineage.server.OpenLineage.RunEvent;
+
+public class NoopProcessor implements LineageProcessor {
+
+ @Override
+ public RunEvent process(RunEvent runEvent) {
+ return runEvent;
+ }
+}
diff --git
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageLogSink.java
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageLogSink.java
new file mode 100644
index 0000000000..62367ad234
--- /dev/null
+++
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageLogSink.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.lineage.sink;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.cfg.EnumFeature;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import io.openlineage.server.OpenLineage.Run;
+import io.openlineage.server.OpenLineage.RunEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LineageLogSink implements LineageSink {
+ private static final Logger LOG =
LoggerFactory.getLogger(LineageLogSink.class);
+ private ObjectMapper objectMapper =
+ JsonMapper.builder()
+ .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
+ .configure(EnumFeature.WRITE_ENUMS_TO_LOWERCASE, true)
+ .enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS)
+ .build()
+ .setSerializationInclusion(JsonInclude.Include.NON_NULL)
+ .registerModule(new JavaTimeModule())
+ .registerModule(new Jdk8Module());
+ private LineageLogger logger = new LineageLogger();
+
+ private static class LineageLogger {
+ private static final Logger LINEAGE_LOG =
LoggerFactory.getLogger(LineageLogger.class);
+
+ public void log(String lineageString) {
+ LINEAGE_LOG.info(lineageString);
+ }
+ }
+
+ @Override
+ public void sink(RunEvent event) {
+ try {
+ logger.log(objectMapper.writeValueAsString(event));
+ } catch (JsonProcessingException e) {
+ LOG.warn(
+ "Process open lineage event failed, run id: {}, error message: {}",
+ getRunId(event),
+ e.getMessage());
+ }
+ }
+
+ private String getRunId(RunEvent event) {
+ Run run = event.getRun();
+ return run == null ? "Unknown" : run.getRunId().toString();
+ }
+}
diff --git
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSink.java
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSink.java
new file mode 100644
index 0000000000..b765138516
--- /dev/null
+++ b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSink.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.lineage.sink;
+
+import io.openlineage.server.OpenLineage;
+import java.io.Closeable;
+import java.util.Map;
+
+/** The LineageSink interface defines a closable component responsible for
sinking lineage event. */
+public interface LineageSink extends Closeable {
+
+ /**
+ * Initializes the lineage sink with the provided configuration.
+ *
+ * @param configs A map representing the configuration for the sink.
+ */
+ default void initialize(Map<String, String> configs) {}
+
+ /** Closes the lineage sink and releases associated resources. */
+ @Override
+ default void close() {}
+
+ /**
+ * Sinks the given lineage run event.
+ *
+ * @param event The lineage run event to be processed.
+ */
+ void sink(OpenLineage.RunEvent event);
+}
diff --git
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
new file mode 100644
index 0000000000..f01dcc80e7
--- /dev/null
+++
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.lineage.sink;
+
+import io.openlineage.server.OpenLineage.RunEvent;
+import java.io.Closeable;
+import java.util.List;
+import java.util.Map;
+
+@SuppressWarnings("unused")
+public class LineageSinkManager implements Closeable {
+
+ public void initialize(List<String> sinks, Map<String, String>
LineageConfigs) {}
+
+ // Checks if the sink queue size surpasses the threshold to avoid
overwhelming lineage sinks.
+ public boolean isHighWatermark() {
+ return false;
+ }
+
+ public void sink(RunEvent runEvent) {}
+
+ @Override
+ public void close() {}
+}
diff --git
a/lineage/src/main/java/org/apache/gravitino/lineage/source/HTTPLineageSource.java
b/lineage/src/main/java/org/apache/gravitino/lineage/source/HTTPLineageSource.java
new file mode 100644
index 0000000000..fd15eaa57f
--- /dev/null
+++
b/lineage/src/main/java/org/apache/gravitino/lineage/source/HTTPLineageSource.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.lineage.source;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.gravitino.lineage.LineageDispatcher;
+import org.apache.gravitino.server.web.SupportsRESTPackages;
+
+public class HTTPLineageSource implements LineageSource, SupportsRESTPackages {
+ @Override
+ public void initialize(Map<String, String> configs, LineageDispatcher
dispatcher) {}
+
+ @Override
+ public Set<String> getRESTPackages() {
+ return ImmutableSet.of();
+ }
+}
diff --git
a/lineage/src/main/java/org/apache/gravitino/lineage/source/LineageSource.java
b/lineage/src/main/java/org/apache/gravitino/lineage/source/LineageSource.java
new file mode 100644
index 0000000000..d633bdbd2d
--- /dev/null
+++
b/lineage/src/main/java/org/apache/gravitino/lineage/source/LineageSource.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.lineage.source;
+
+import java.io.Closeable;
+import java.util.Map;
+import org.apache.gravitino.lineage.LineageDispatcher;
+
+/**
+ * The LineageSource interface defines a closable data source for receiving
and dispatching lineage
+ * information.
+ */
+public interface LineageSource extends Closeable {
+
+ /**
+ * Initializes the data source with the given configurations and a lineage
dispatcher.
+ *
+ * @param configs A map containing configuration information for the data
source.
+ * @param dispatcher A dispatcher used to distribute lineage event.
+ */
+ default void initialize(Map<String, String> configs, LineageDispatcher
dispatcher) {}
+
+ /** Closes the data source and releases related resources. */
+ @Override
+ default void close() {}
+}
diff --git
a/lineage/src/test/java/org/apache/gravitino/lineage/TestLineageConfig.java
b/lineage/src/test/java/org/apache/gravitino/lineage/TestLineageConfig.java
new file mode 100644
index 0000000000..d4dbfa8a3a
--- /dev/null
+++ b/lineage/src/test/java/org/apache/gravitino/lineage/TestLineageConfig.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.lineage;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.lineage.sink.LineageLogSink;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestLineageConfig {
+
+ @Test
+ void testLineageSource() {
+ // default config with HTTP source
+ LineageConfig lineageConfig = new LineageConfig(ImmutableMap.of());
+ Assertions.assertEquals(LineageConfig.LINEAGE_HTTP_SOURCE_NAME,
lineageConfig.source());
+ Assertions.assertEquals(
+ LineageConfig.LINEAGE_HTTP_SOURCE_CLASS_NAME,
lineageConfig.sourceClass());
+
+ // config with custom source
+ lineageConfig =
+ new LineageConfig(
+ ImmutableMap.of(
+ LineageConfig.LINEAGE_CONFIG_SOURCE,
+ "source1",
+ "source1." + LineageConfig.LINEAGE_SOURCE_CLASS_NAME,
+ "test-class"));
+ Assertions.assertEquals("source1", lineageConfig.source());
+ Assertions.assertEquals("test-class", lineageConfig.sourceClass());
+
+ LineageConfig lineageConfig2 =
+ new LineageConfig(ImmutableMap.of(LineageConfig.LINEAGE_CONFIG_SOURCE,
"source2"));
+
+ Assertions.assertThrowsExactly(
+ IllegalArgumentException.class, () -> lineageConfig2.sourceClass());
+ }
+
+ @Test
+ void testGetSinkConfigs() {
+ // default config with log sink
+ LineageConfig lineageConfig = new LineageConfig(ImmutableMap.of());
+ Map<String, String> sinkConfigs = lineageConfig.getSinkConfigs();
+ String sinks = sinkConfigs.get(LineageConfig.LINEAGE_CONFIG_SINKS);
+ Assertions.assertEquals(LineageConfig.LINEAGE_LOG_SINK_NAME, sinks);
+ String className =
+ sinkConfigs.get(
+ LineageConfig.LINEAGE_LOG_SINK_NAME + "." +
LineageConfig.LINEAGE_SINK_CLASS_NAME);
+ Assertions.assertEquals(LineageLogSink.class.getName(), className);
+
+ // config multi sinks
+ Map<String, String> config2 =
+ ImmutableMap.of(
+ LineageConfig.LINEAGE_CONFIG_SINKS,
+ "sink1,sink2",
+ "sink1." + LineageConfig.LINEAGE_SINK_CLASS_NAME,
+ "test-class",
+ "sink2." + LineageConfig.LINEAGE_SINK_CLASS_NAME,
+ "test-class2");
+ lineageConfig = new LineageConfig(config2);
+ sinkConfigs = lineageConfig.getSinkConfigs();
+ sinks = sinkConfigs.get(LineageConfig.LINEAGE_CONFIG_SINKS);
+ Assertions.assertEquals("sink1,sink2", sinks);
+ Assertions.assertEquals(
+ "test-class", sinkConfigs.get("sink1." +
LineageConfig.LINEAGE_SINK_CLASS_NAME));
+ Assertions.assertEquals(
+ "test-class2", sinkConfigs.get("sink2." +
LineageConfig.LINEAGE_SINK_CLASS_NAME));
+
+ // test missing sink1 class name
+ Map<String, String> config3 =
+ ImmutableMap.of(
+ LineageConfig.LINEAGE_CONFIG_SINKS,
+ "sink1,sink2",
+ "sink2." + LineageConfig.LINEAGE_SINK_CLASS_NAME,
+ "test-class2");
+
+ Assertions.assertThrowsExactly(
+ IllegalArgumentException.class,
+ () -> {
+ LineageConfig lineageConfig1 = new LineageConfig(config3);
+ lineageConfig1.getSinkConfigs();
+ });
+ }
+}
diff --git
a/server-common/src/main/java/org/apache/gravitino/server/web/SupportsRESTPackages.java
b/server-common/src/main/java/org/apache/gravitino/server/web/SupportsRESTPackages.java
new file mode 100644
index 0000000000..80c89f05c9
--- /dev/null
+++
b/server-common/src/main/java/org/apache/gravitino/server/web/SupportsRESTPackages.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.server.web;
+
+import java.util.Set;
+
+/**
+ * This interface provides a method to retrieve a set of REST package names.
These package names can
+ * be used to be injected into a Jetty service, which is helpful for the Jetty
service to locate and
+ * handle REST resources.
+ */
+public interface SupportsRESTPackages {
+ /**
+ * Retrieves a set of REST package names.
+ *
+ * @return A set containing the names of REST packages.
+ */
+ Set<String> getRESTPackages();
+}
diff --git a/server/build.gradle.kts b/server/build.gradle.kts
index 4fe6ae2707..b58e42773d 100644
--- a/server/build.gradle.kts
+++ b/server/build.gradle.kts
@@ -27,6 +27,7 @@ dependencies {
implementation(project(":api"))
implementation(project(":common"))
implementation(project(":core"))
+ implementation(project(":lineage"))
implementation(project(":server-common"))
implementation(libs.bundles.jetty)
implementation(libs.bundles.jersey)
diff --git
a/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java
b/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java
index 0c730439b1..accf65cb5f 100644
--- a/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java
+++ b/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java
@@ -18,9 +18,8 @@
*/
package org.apache.gravitino.server;
-import com.google.common.collect.Lists;
import java.io.File;
-import java.util.List;
+import java.util.HashSet;
import java.util.Properties;
import javax.servlet.Servlet;
import org.apache.gravitino.Configs;
@@ -33,6 +32,9 @@ import org.apache.gravitino.catalog.SchemaDispatcher;
import org.apache.gravitino.catalog.TableDispatcher;
import org.apache.gravitino.catalog.TopicDispatcher;
import org.apache.gravitino.credential.CredentialOperationDispatcher;
+import org.apache.gravitino.lineage.LineageConfig;
+import org.apache.gravitino.lineage.LineageDispatcher;
+import org.apache.gravitino.lineage.LineageService;
import org.apache.gravitino.metalake.MetalakeDispatcher;
import org.apache.gravitino.metrics.MetricsSystem;
import org.apache.gravitino.metrics.source.MetricsSource;
@@ -75,10 +77,13 @@ public class GravitinoServer extends ResourceConfig {
private final GravitinoEnv gravitinoEnv;
+ private final LineageService lineageService;
+
public GravitinoServer(ServerConfig config, GravitinoEnv gravitinoEnv) {
- serverConfig = config;
- server = new JettyServer();
+ this.serverConfig = config;
+ this.server = new JettyServer();
this.gravitinoEnv = gravitinoEnv;
+ this.lineageService = new LineageService();
}
public void initialize() {
@@ -90,6 +95,9 @@ public class GravitinoServer extends ResourceConfig {
ServerAuthenticator.getInstance().initialize(serverConfig);
+ lineageService.initialize(
+ new
LineageConfig(serverConfig.getConfigsWithPrefix(LineageConfig.LINEAGE_CONFIG_PREFIX)));
+
// initialize Jersey REST API resources.
initializeRestApi();
}
@@ -99,9 +107,11 @@ public class GravitinoServer extends ResourceConfig {
}
private void initializeRestApi() {
- List<String> restApiPackages =
Lists.newArrayList("org.apache.gravitino.server.web.rest");
-
restApiPackages.addAll(serverConfig.get(Configs.REST_API_EXTENSION_PACKAGES));
- packages(restApiPackages.toArray(new String[0]));
+ HashSet<String> restApiPackagesSet = new HashSet<>();
+ restApiPackagesSet.add("org.apache.gravitino.server.web.rest");
+
restApiPackagesSet.addAll(serverConfig.get(Configs.REST_API_EXTENSION_PACKAGES));
+ restApiPackagesSet.addAll(lineageService.getRESTPackages());
+ packages(restApiPackagesSet.toArray(new String[0]));
boolean enableAuthorization =
serverConfig.get(Configs.ENABLE_AUTHORIZATION);
register(
@@ -120,6 +130,7 @@ public class GravitinoServer extends ResourceConfig {
.to(CredentialOperationDispatcher.class)
.ranked(1);
bind(gravitinoEnv.modelDispatcher()).to(ModelDispatcher.class).ranked(1);
+ bind(lineageService).to(LineageDispatcher.class).ranked(1);
}
});
register(JsonProcessingExceptionMapper.class);
@@ -161,6 +172,9 @@ public class GravitinoServer extends ResourceConfig {
public void stop() {
server.stop();
gravitinoEnv.shutdown();
+ if (lineageService != null) {
+ lineageService.close();
+ }
}
public static void main(String[] args) {
diff --git a/settings.gradle.kts b/settings.gradle.kts
index c865e14e7a..d4262e69c3 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -82,3 +82,4 @@ include(":bundles:gcp", ":bundles:gcp-bundle")
include(":bundles:aliyun", ":bundles:aliyun-bundle")
include(":bundles:azure", ":bundles:azure-bundle")
include(":catalogs:hadoop-common")
+include(":lineage")