jerryshao commented on code in PR #6782:
URL: https://github.com/apache/gravitino/pull/6782#discussion_r2025746014


##########
lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java:
##########
@@ -0,0 +1,41 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.lineage.sink;
+
+import io.openlineage.server.OpenLineage.RunEvent;
+import java.io.Closeable;
+import java.util.List;
+import java.util.Map;
+
+@SuppressWarnings("unused")
+public class LineageSinkManager implements Closeable {
+
+  public void initialize(List<String> sinks, Map<String, String> 
LineageConfigs) {}
+
+  // Checks if the sink queue size surpasses the threshold to avoid 
overwhelming lineage sinks.
+  public boolean isHighWaterMark() {
+    return false;
+  }
+
+  public void sink(RunEvent runEvent) {}

Review Comment:
   Where do you sink the `runEvent` to the different sinks?



##########
lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageLogSinker.java:
##########
@@ -0,0 +1,56 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.lineage.sink;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.cfg.EnumFeature;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import io.openlineage.server.OpenLineage.RunEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LineageLogSinker implements LineageSink {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LineageLogSinker.class);
+  private ObjectMapper objectMapper =
+      JsonMapper.builder()
+          .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
+          .configure(EnumFeature.WRITE_ENUMS_TO_LOWERCASE, true)
+          .enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS)
+          .build()
+          .setSerializationInclusion(JsonInclude.Include.NON_NULL)
+          .registerModule(new JavaTimeModule())
+          .registerModule(new Jdk8Module());
+
+  public LineageLogSinker() {}
+
+  @Override
+  public void sink(RunEvent event) {
+    try {
+      LOG.info("Open lineage event:{}", 
objectMapper.writeValueAsString(event));
+    } catch (Exception e) {
+      LOG.warn("Write open lineage event failed,", e);
+    }
+  }
+}

Review Comment:
   I think we can use a new file to store the lineage event, like audit, what 
do you think?



##########
server/src/main/java/org/apache/gravitino/server/GravitinoServer.java:
##########
@@ -75,10 +77,13 @@ public class GravitinoServer extends ResourceConfig {
 
   private final GravitinoEnv gravitinoEnv;
 
+  private final LineageService lineageService;
+
   public GravitinoServer(ServerConfig config, GravitinoEnv gravitinoEnv) {
-    serverConfig = config;
-    server = new JettyServer();
+    this.serverConfig = config;
+    this.server = new JettyServer();
     this.gravitinoEnv = gravitinoEnv;
+    this.lineageService = new LineageService();

Review Comment:
   Is it better to use auxService to load in Lineage service, what do you think?



##########
lineage/src/main/java/org/apache/gravitino/lineage/source/LineageSource.java:
##########
@@ -0,0 +1,55 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.lineage.source;
+
+import com.google.common.collect.ImmutableSet;
+import java.io.Closeable;
+import java.util.Map;
+import java.util.Set;
+import org.apache.gravitino.lineage.LineageDispatcher;
+
+/**
+ * The LineageSource interface defines a closable data source for receiving 
and dispatching lineage
+ * information. It provides default methods for initialization, closing, and 
retrieving a set of
+ * REST packages.
+ */
+public interface LineageSource extends Closeable {
+
+  /**
+   * Initializes the data source with the given configurations and a lineage 
dispatcher.
+   *
+   * @param configs A map containing configuration information for the data 
source.
+   * @param dispatcher A dispatcher used to distribute lineage event.
+   */
+  default void initialize(Map<String, String> configs, LineageDispatcher 
dispatcher) {}
+
+  /** Closes the data source and releases related resources. */
+  @Override
+  default void close() {}
+
+  /**
+   * Retrieves a set of REST packages associated with the data source.
+   *
+   * @return An immutable set containing the names of REST packages.
+   */
+  default Set<String> getRESTPackages() {
+    return ImmutableSet.of();
+  }

Review Comment:
   Why do we need this method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to