github-actions[bot] commented on code in PR #61004:
URL: https://github.com/apache/doris/pull/61004#discussion_r2888062571


##########
fe/fe-common/src/main/java/org/apache/doris/common/Config.java:
##########
@@ -3603,6 +3607,10 @@ public static int metaServiceRpcRetryTimes() {
     @ConfField(mutable = true)
     public static int audit_event_log_queue_size = 250000;
 
+    @ConfField(mutable = true, description = {"血缘事件队列最大长度,超过长度事件会被舍弃",
+            "Max size of lineage event queue, events will be discarded when 
exceeded"})
+    public static int lineage_event_queue_size = 50000;

Review Comment:
   **Issue #3 (Low): `lineage_event_queue_size` marked `mutable=true` but is 
effectively immutable**
   
   The `LinkedBlockingDeque` in `LineageEventProcessor` is constructed with 
this config value at field initialization time (line 70-71 of 
LineageEventProcessor.java). `LinkedBlockingDeque` has no `setCapacity()` 
method, so changing this config at runtime via `SET GLOBAL` has no effect on 
the actual queue capacity.
   
   To make it truly mutable, follow the `AuditEventProcessor` pattern: use an 
unbounded `LinkedBlockingDeque` and check `Config.lineage_event_queue_size` at 
enqueue time in `submitLineageEvent()`:
   ```java
   if (eventQueue.size() >= Config.lineage_event_queue_size) {
       LOG.warn("...");
       return false;
   }
   eventQueue.add(lineageInfo);
   ```
   
   Alternatively, remove `mutable = true` if runtime changes are not intended.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/lineage/LineageEventProcessor.java:
##########
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.lineage;
+
+import org.apache.doris.common.Config;
+import org.apache.doris.extension.loader.ClassLoadingPolicy;
+import org.apache.doris.extension.loader.DirectoryPluginRuntimeManager;
+import org.apache.doris.extension.loader.LoadFailure;
+import org.apache.doris.extension.loader.LoadReport;
+import org.apache.doris.extension.loader.PluginHandle;
+import org.apache.doris.extension.spi.PluginContext;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Processor that queues lineage events and dispatches them to lineage plugins.
+ * <p>
+ * Plugins are discovered via two mechanisms (aligned with
+ * {@code AuthenticationPluginManager} pattern):
+ * <ol>
+ *   <li>Built-in: {@link ServiceLoader} on classpath</li>
+ *   <li>External: {@link DirectoryPluginRuntimeManager} from
+ *       {@code $plugin_dir/lineage/} directory</li>
+ * </ol>
+ * </p>
+ */
+public class LineageEventProcessor {
+
+    private static final Logger LOG = 
LogManager.getLogger(LineageEventProcessor.class);
+    private static final long EVENT_POLL_TIMEOUT_SECONDS = 5L;
+
+    /** Parent-first prefixes for child-first classloading isolation. */
+    private static final List<String> LINEAGE_PARENT_FIRST_PREFIXES =
+            Collections.singletonList("org.apache.doris.nereids.lineage.");
+
+    private final AtomicReference<List<LineagePlugin>> lineagePlugins = new 
AtomicReference<>(Collections.emptyList());
+    private final BlockingQueue<LineageInfo> eventQueue =
+            new LinkedBlockingDeque<>(Config.lineage_event_queue_size);
+    private final AtomicBoolean isInit = new AtomicBoolean(false);
+
+    /** Factory registry by plugin name (like 
AuthenticationPluginManager.factories). */
+    private final Map<String, LineagePluginFactory> factories = new 
ConcurrentHashMap<>();
+    private final DirectoryPluginRuntimeManager<LineagePluginFactory> 
runtimeManager =
+            new DirectoryPluginRuntimeManager<>();
+
+    private Thread workerThread;
+
+    /**
+     * Create a lineage event processor.
+     */
+    public LineageEventProcessor() {
+    }
+
+    /**
+     * Start the background worker thread.
+     */
+    public void start() {
+        if (!isInit.compareAndSet(false, true)) {
+            return;
+        }
+        discoverPlugins();
+        workerThread = new Thread(new Worker(), "LineageEventProcessor");
+        workerThread.setDaemon(true);
+        workerThread.start();
+    }
+
+    /**
+     * Discover lineage plugins via dual mechanism:
+     * 1. ServiceLoader for built-in (classpath) factories
+     * 2. DirectoryPluginRuntimeManager for external (directory) plugins
+     *
+     * <p>Aligned with {@code AuthenticationPluginManager} pattern.
+     */
+    private void discoverPlugins() {
+        // 1. Built-in discovery (classpath ServiceLoader)
+        try {
+            ServiceLoader<LineagePluginFactory> serviceLoader = 
ServiceLoader.load(LineagePluginFactory.class);
+            Iterator<LineagePluginFactory> iterator = serviceLoader.iterator();
+            while (true) {
+                LineagePluginFactory factory;
+                try {
+                    if (!iterator.hasNext()) {
+                        break;
+                    }
+                    factory = iterator.next();
+                } catch (ServiceConfigurationError e) {
+                    LOG.warn("Failed to load built-in lineage plugin factory 
from ServiceLoader, skip provider", e);
+                    continue;
+                }
+                String pluginName = safeFactoryName(factory);
+                if (pluginName.isEmpty()) {
+                    LOG.warn("Skip built-in lineage plugin factory with empty 
name: {}",
+                            factory == null ? "null" : 
factory.getClass().getName());
+                    continue;
+                }
+                LineagePluginFactory existing = 
factories.putIfAbsent(pluginName, factory);
+                if (existing != null) {
+                    LOG.warn("Skip duplicated built-in lineage plugin name: 
{}", pluginName);
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to discover built-in lineage plugin factories via 
ServiceLoader", e);
+        }
+
+        // 2. External discovery (plugin_dir/lineage/ directory)
+        try {
+            List<Path> pluginRoots = Collections.singletonList(
+                    Paths.get(Config.plugin_dir, "lineage"));
+            ClassLoadingPolicy policy = new 
ClassLoadingPolicy(LINEAGE_PARENT_FIRST_PREFIXES);
+            LoadReport<LineagePluginFactory> report = runtimeManager.loadAll(
+                    pluginRoots, getClass().getClassLoader(),
+                    LineagePluginFactory.class, policy);
+
+            for (LoadFailure failure : report.getFailures()) {
+                LOG.warn("Skip lineage plugin directory due to load failure: 
pluginDir={}, stage={}, message={}",
+                        failure.getPluginDir(), failure.getStage(), 
failure.getMessage(), failure.getCause());
+            }
+
+            for (PluginHandle<LineagePluginFactory> handle : 
report.getSuccesses()) {
+                String pluginName = handle.getPluginName();
+                LineagePluginFactory existing = 
factories.putIfAbsent(pluginName, handle.getFactory());
+                if (existing != null) {
+                    LOG.warn("Skip duplicated lineage plugin name: {} from 
directory {}", pluginName,
+                            handle.getPluginDir());
+                } else {
+                    LOG.info("Loaded external lineage plugin factory: name={}, 
pluginDir={}, jarCount={}",
+                            pluginName, handle.getPluginDir(), 
handle.getResolvedJars().size());
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to discover external lineage plugins from plugin 
directory", e);
+        }
+
+        // 3. Create and initialize plugin instances from all discovered 
factories
+        List<LineagePlugin> plugins = new ArrayList<>();
+        for (Map.Entry<String, LineagePluginFactory> entry : 
factories.entrySet()) {
+            String pluginName = entry.getKey();
+            try {

Review Comment:
   **Issue #1 (Medium): `Config.activate_lineage_plugin` is not enforced at 
framework level**
   
   All discovered plugin factories are instantiated unconditionally here. The 
`activate_lineage_plugin` config is documented as the mechanism to select which 
plugins are active, but it is never checked by the framework - filtering is 
entirely delegated to each plugin's `eventFilter()` implementation.
   
   This means:
   1. Plugins are loaded and initialized even when not listed in 
`activate_lineage_plugin`
   2. A buggy plugin that returns `true` from `eventFilter()` regardless of 
config will receive events even when not configured
   3. There is no built-in way to disable a misbehaving plugin without removing 
its JAR
   
   Suggestion: Either filter factories by `activate_lineage_plugin` before 
instantiation, or provide a default `eventFilter()` implementation in the 
framework that checks the config:
   ```java
   // In discoverPlugins(), before creating plugin:
   Set<String> activeNames = new 
HashSet<>(Arrays.asList(Config.activate_lineage_plugin));
   if (!activeNames.isEmpty() && !activeNames.contains(pluginName)) {
       LOG.info("Skip lineage plugin not in activate_lineage_plugin: {}", 
pluginName);
       continue;
   }
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/lineage/LineageEventProcessor.java:
##########
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.lineage;
+
+import org.apache.doris.common.Config;
+import org.apache.doris.extension.loader.ClassLoadingPolicy;
+import org.apache.doris.extension.loader.DirectoryPluginRuntimeManager;
+import org.apache.doris.extension.loader.LoadFailure;
+import org.apache.doris.extension.loader.LoadReport;
+import org.apache.doris.extension.loader.PluginHandle;
+import org.apache.doris.extension.spi.PluginContext;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Processor that queues lineage events and dispatches them to lineage plugins.
+ * <p>
+ * Plugins are discovered via two mechanisms (aligned with
+ * {@code AuthenticationPluginManager} pattern):
+ * <ol>
+ *   <li>Built-in: {@link ServiceLoader} on classpath</li>
+ *   <li>External: {@link DirectoryPluginRuntimeManager} from
+ *       {@code $plugin_dir/lineage/} directory</li>
+ * </ol>
+ * </p>
+ */
+public class LineageEventProcessor {
+
+    private static final Logger LOG = 
LogManager.getLogger(LineageEventProcessor.class);
+    private static final long EVENT_POLL_TIMEOUT_SECONDS = 5L;
+
+    /** Parent-first prefixes for child-first classloading isolation. */
+    private static final List<String> LINEAGE_PARENT_FIRST_PREFIXES =
+            Collections.singletonList("org.apache.doris.nereids.lineage.");
+
+    private final AtomicReference<List<LineagePlugin>> lineagePlugins = new 
AtomicReference<>(Collections.emptyList());
+    private final BlockingQueue<LineageInfo> eventQueue =
+            new LinkedBlockingDeque<>(Config.lineage_event_queue_size);
+    private final AtomicBoolean isInit = new AtomicBoolean(false);
+
+    /** Factory registry by plugin name (like 
AuthenticationPluginManager.factories). */
+    private final Map<String, LineagePluginFactory> factories = new 
ConcurrentHashMap<>();
+    private final DirectoryPluginRuntimeManager<LineagePluginFactory> 
runtimeManager =
+            new DirectoryPluginRuntimeManager<>();
+
+    private Thread workerThread;
+
+    /**
+     * Create a lineage event processor.
+     */
+    public LineageEventProcessor() {
+    }
+
+    /**
+     * Start the background worker thread.
+     */
+    public void start() {
+        if (!isInit.compareAndSet(false, true)) {
+            return;
+        }
+        discoverPlugins();
+        workerThread = new Thread(new Worker(), "LineageEventProcessor");
+        workerThread.setDaemon(true);
+        workerThread.start();
+    }
+
+    /**
+     * Discover lineage plugins via dual mechanism:
+     * 1. ServiceLoader for built-in (classpath) factories
+     * 2. DirectoryPluginRuntimeManager for external (directory) plugins
+     *
+     * <p>Aligned with {@code AuthenticationPluginManager} pattern.
+     */
+    private void discoverPlugins() {
+        // 1. Built-in discovery (classpath ServiceLoader)
+        try {
+            ServiceLoader<LineagePluginFactory> serviceLoader = 
ServiceLoader.load(LineagePluginFactory.class);
+            Iterator<LineagePluginFactory> iterator = serviceLoader.iterator();
+            while (true) {
+                LineagePluginFactory factory;
+                try {
+                    if (!iterator.hasNext()) {
+                        break;
+                    }
+                    factory = iterator.next();
+                } catch (ServiceConfigurationError e) {
+                    LOG.warn("Failed to load built-in lineage plugin factory 
from ServiceLoader, skip provider", e);
+                    continue;
+                }
+                String pluginName = safeFactoryName(factory);
+                if (pluginName.isEmpty()) {
+                    LOG.warn("Skip built-in lineage plugin factory with empty 
name: {}",
+                            factory == null ? "null" : 
factory.getClass().getName());
+                    continue;
+                }
+                LineagePluginFactory existing = 
factories.putIfAbsent(pluginName, factory);
+                if (existing != null) {
+                    LOG.warn("Skip duplicated built-in lineage plugin name: 
{}", pluginName);
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to discover built-in lineage plugin factories via 
ServiceLoader", e);
+        }
+
+        // 2. External discovery (plugin_dir/lineage/ directory)
+        try {
+            List<Path> pluginRoots = Collections.singletonList(
+                    Paths.get(Config.plugin_dir, "lineage"));
+            ClassLoadingPolicy policy = new 
ClassLoadingPolicy(LINEAGE_PARENT_FIRST_PREFIXES);
+            LoadReport<LineagePluginFactory> report = runtimeManager.loadAll(
+                    pluginRoots, getClass().getClassLoader(),
+                    LineagePluginFactory.class, policy);
+
+            for (LoadFailure failure : report.getFailures()) {
+                LOG.warn("Skip lineage plugin directory due to load failure: 
pluginDir={}, stage={}, message={}",
+                        failure.getPluginDir(), failure.getStage(), 
failure.getMessage(), failure.getCause());
+            }
+
+            for (PluginHandle<LineagePluginFactory> handle : 
report.getSuccesses()) {
+                String pluginName = handle.getPluginName();
+                LineagePluginFactory existing = 
factories.putIfAbsent(pluginName, handle.getFactory());
+                if (existing != null) {
+                    LOG.warn("Skip duplicated lineage plugin name: {} from 
directory {}", pluginName,
+                            handle.getPluginDir());
+                } else {
+                    LOG.info("Loaded external lineage plugin factory: name={}, 
pluginDir={}, jarCount={}",
+                            pluginName, handle.getPluginDir(), 
handle.getResolvedJars().size());
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to discover external lineage plugins from plugin 
directory", e);
+        }
+
+        // 3. Create and initialize plugin instances from all discovered 
factories
+        List<LineagePlugin> plugins = new ArrayList<>();
+        for (Map.Entry<String, LineagePluginFactory> entry : 
factories.entrySet()) {
+            String pluginName = entry.getKey();
+            try {
+                Map<String, String> props = new HashMap<>();
+                props.put("plugin.path", resolvePluginPath(pluginName));
+                props.put("plugin.name", pluginName);
+                PluginContext context = new PluginContext(props);
+                LineagePlugin plugin = entry.getValue().create(context);
+                if (plugin != null) {
+                    plugin.initialize(context);
+                    plugins.add(plugin);
+                    LOG.info("Loaded lineage plugin: {}, pluginPath={}", 
pluginName, props.get("plugin.path"));
+                }
+            } catch (Exception e) {
+                LOG.warn("Failed to create/initialize lineage plugin: {}", 
pluginName, e);
+            }
+        }
+        refreshPlugins(plugins);
+    }
+
+    private String safeFactoryName(LineagePluginFactory factory) {
+        if (factory == null) {
+            return "";
+        }
+        try {
+            String name = factory.name();
+            return name == null ? "" : name.trim();
+        } catch (Throwable t) {
+            LOG.warn("Failed to get lineage plugin factory name, skip factory 
class={}",
+                    factory.getClass().getName(), t);
+            return "";
+        }
+    }
+
+    /**
+     * Resolve plugin path: prefer the directory from 
DirectoryPluginRuntimeManager,
+     * fallback to convention path.
+     */
+    private String resolvePluginPath(String pluginName) {
+        return runtimeManager.get(pluginName)
+                .map(handle -> handle.getPluginDir().toString())
+                .orElse(Config.plugin_dir + "/lineage/" + pluginName);
+    }
+
+    /**
+     * Update the active lineage plugin list.
+     */
+    public void refreshPlugins(List<LineagePlugin> plugins) {
+        List<LineagePlugin> safePlugins = plugins == null ? 
Collections.emptyList() : plugins;
+        lineagePlugins.set(safePlugins);
+        if (safePlugins.isEmpty()) {
+            clearPendingEvents();
+        }
+    }
+
+    /**
+     * Returns true when at least one loaded plugin is currently willing to 
receive lineage events.
+     */
+    public boolean hasActivePlugins() {
+        List<LineagePlugin> plugins = lineagePlugins.get();
+        if (plugins.isEmpty()) {
+            return false;
+        }
+        for (LineagePlugin plugin : plugins) {
+            if (plugin == null) {
+                continue;
+            }
+            try {
+                if (plugin.eventFilter()) {
+                    return true;
+                }
+            } catch (Throwable t) {
+                LOG.warn("Failed to evaluate lineage plugin event filter: {}", 
plugin.getClass().getName(), t);
+            }
+        }
+        return false;
+    }
+
+    private void clearPendingEvents() {
+        int dropped = 0;
+        while (eventQueue.poll() != null) {
+            dropped++;
+        }
+        if (dropped > 0) {
+            LOG.warn("Lineage event queue cleared because no active plugins. 
dropped={}", dropped);
+        }
+    }
+
+    /**
+     * Submit a lineage event to the processing queue.
+     *
+     * @param lineageInfo lineage info to submit
+     * @return true if accepted, false otherwise
+     */
+    public boolean submitLineageEvent(LineageInfo lineageInfo) {
+        if (lineageInfo == null) {
+            return false;
+        }
+        try {
+            if (!eventQueue.offer(lineageInfo)) {
+                String queryId = getQueryId(lineageInfo);
+                LOG.warn("the lineage event queue is full with size {}, 
discard the lineage event: {}",
+                        eventQueue.size(), queryId);
+                return false;
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Lineage event enqueued: queryId={}, queueSize={}",
+                        getQueryId(lineageInfo), eventQueue.size());
+            }
+            return true;
+        } catch (Exception e) {
+            String queryId = getQueryId(lineageInfo);
+            LOG.warn("encounter exception when handle lineage event {}, 
discard the event",
+                    queryId, e);
+            return false;
+        }
+    }
+
+    /**
+     * Worker that polls events and invokes lineage plugins.
+     */
+    public class Worker implements Runnable {
+        /**
+         * Run the lineage processing loop.
+         */
+        @Override
+        public void run() {
+            LineageInfo lineageInfo;
+            while (true) {
+                List<LineagePlugin> currentPlugins = lineagePlugins.get();
+
+                try {

Review Comment:
   **Issue #2 (Medium): Worker reads stale plugin list and has no graceful 
shutdown**
   
   Two issues here:
   
   1. **Stale plugin list**: `currentPlugins` is read at line 298 BEFORE 
`eventQueue.poll()` at line 301, which blocks for up to 5 seconds. If 
`refreshPlugins()` is called during that wait, the event will be dispatched to 
the old plugin list. The fix is to read `lineagePlugins.get()` AFTER 
successfully polling an event:
   ```java
   while (true) {
       try {
           lineageInfo = eventQueue.poll(EVENT_POLL_TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
           if (lineageInfo == null) {
               continue;
           }
       } catch (InterruptedException e) {
           LOG.warn("...", e);
           continue;
       }
       List<LineagePlugin> currentPlugins = lineagePlugins.get(); // Move here
       for (LineagePlugin lineagePlugin : currentPlugins) {
           ...
       }
   }
   ```
   
   2. **No graceful shutdown**: The `while(true)` loop has no exit condition. 
When FE shuts down, the daemon thread is killed abruptly. Consider adding a 
`volatile boolean running` flag and a `stop()` method that sets it to false and 
interrupts the worker thread, so plugins can be closed cleanly.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/lineage/LineageUtils.java:
##########
@@ -0,0 +1,322 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.lineage;
+
+import org.apache.doris.analysis.StatementBase;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.common.util.PrintableMap;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.nereids.glue.LogicalPlanAdapter;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.InlineTable;
+import org.apache.doris.nereids.trees.plans.commands.Command;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalTableSink;
+import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Utility methods for lineage event construction and filtering.
+ */
+public final class LineageUtils {
+
+    public static final Logger LOG = LogManager.getLogger(LineageUtils.class);
+    private static final String EMPTY_STRING = "";
+    private static final String CATALOG_TYPE_KEY = "type";
+    private static final int NO_PLUGINS = 0;
+    private static final long UNKNOWN_START_TIME_MS = 0L;
+
+    private LineageUtils() {
+    }
+
+    /**
+     * Check whether the parsed statement matches the current command type.
+     *
+     * @param executor statement executor containing parsed statement
+     * @param currentCommand current command class
+     * @return true if parsed command matches current command
+     */
+    public static boolean isSameParsedCommand(StmtExecutor executor, Class<? 
extends Command> currentCommand) {
+        if (executor == null || currentCommand == null) {
+            return false;
+        }
+        StatementBase parsedStmt = executor.getParsedStmt();
+        if (!(parsedStmt instanceof LogicalPlanAdapter)) {
+            return false;
+        }
+        Plan parsedPlan = ((LogicalPlanAdapter) parsedStmt).getLogicalPlan();
+        if (!(parsedPlan instanceof Command)) {
+            return false;
+        }
+        return parsedPlan.getClass().equals(currentCommand);
+    }
+
+    /**
+     * Build lineage info and compute lineage context if lineage plugins are 
enabled.
+     *
+     * @param plan the plan to extract lineage from
+     * @param sourceCommand the command type for the event
+     * @param ctx connect context holding query metadata
+     * @param executor statement executor for query text
+     */
+    public static LineageInfo buildLineageInfo(Plan plan, Class<? extends 
Command> sourceCommand,
+                                               ConnectContext ctx, 
StmtExecutor executor) {
+        if (plan == null || ctx == null) {
+            return null;
+        }
+        long startNs = 0L;
+        if (LOG.isDebugEnabled()) {
+            startNs = System.nanoTime();
+        }
+        long eventTimeMs = System.currentTimeMillis();
+        long durationMs = LineageContext.UNKNOWN_DURATION_MS;
+        long startTimeMs = ctx.getStartTime();
+        if (startTimeMs > UNKNOWN_START_TIME_MS) {
+            long elapsed = eventTimeMs - startTimeMs;
+            durationMs = elapsed >= 0 ? elapsed : 
LineageContext.UNKNOWN_DURATION_MS;
+        }
+        LineageInfo lineageInfo = 
LineageInfoExtractor.extractLineageInfo(plan);
+        LineageContext context = buildLineageContext(sourceCommand, ctx, 
executor, eventTimeMs, durationMs);
+        String catalog = safeString(ctx.getDefaultCatalog());
+        context.setCatalog(catalog);
+        
context.setExternalCatalogProperties(collectExternalCatalogProperties(lineageInfo));
+        lineageInfo.setContext(context);
+        if (LOG.isDebugEnabled()) {
+            Map<?, ?> directMap = lineageInfo.getDirectLineageMap();
+            Object indirectMap = lineageInfo.getInDirectLineageMapByDataset();
+            Object tableLineage = lineageInfo.getTableLineageSet();
+            Object targetColumns = lineageInfo.getTargetColumns();
+            String targetTable = lineageInfo.getTargetTable() == null
+                    ? "null"
+                    : lineageInfo.getTargetTable().getName();
+            int externalCatalogs = context.getExternalCatalogProperties() == 
null
+                    ? 0
+                    : context.getExternalCatalogProperties().size();
+            long elapsedMs = (System.nanoTime() - startNs) / 1_000_000L;
+            LOG.debug("Lineage info built: plan={}, targetTable={}, 
targetColumns={}, directMap={},"
+                            + " indirectMap={}, tableLineage={}, 
externalCatalogs={}, elapsedMs={}",
+                    plan.getClass().getSimpleName(), targetTable, 
targetColumns, directMap, indirectMap,
+                    tableLineage, externalCatalogs, elapsedMs);
+        }
+        return lineageInfo;
+    }
+
+    /**
+     * Submit lineage event if lineage plugins are enabled and command matches 
parsed statement.
+     *
+     * @param executor statement executor containing parsed statement
+     * @param lineagePlan optional lineage plan to use instead of current plan
+     * @param currentPlan current logical plan
+     * @param currentHandleClass current command class
+     */
+    public static void submitLineageEventIfNeeded(StmtExecutor executor, 
Optional<Plan> lineagePlan,
+                                                  LogicalPlan currentPlan,
+                                                  Class<? extends Command> 
currentHandleClass) {
+        if (!isLineagePluginConfigured()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Skip lineage: no plugin configured");
+            }
+            return;
+        }
+        if (!LineageUtils.isSameParsedCommand(executor, currentHandleClass)) {
+            if (LOG.isDebugEnabled()) {
+                String parsedCommand = executor == null || 
executor.getParsedStmt() == null
+                        ? "null"
+                        : executor.getParsedStmt().getClass().getSimpleName();
+                LOG.debug("Skip lineage: parsed command mismatch, parsed={}, 
current={}",
+                        parsedCommand, currentHandleClass == null ? "null" : 
currentHandleClass.getSimpleName());
+            }
+            return;
+        }
+        Plan plan = lineagePlan.orElse(currentPlan);
+        if (plan == null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Skip lineage: plan is null");
+            }
+            return;
+        }
+        boolean valuesOnly = isValuesOnly(plan);
+        boolean internalTarget = !valuesOnly && isInternalSchemaTarget(plan);
+        if (shouldSkipLineage(plan)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Skip lineage: valuesOnly={}, 
internalSchemaTarget={}, plan={}",
+                        valuesOnly, internalTarget, 
plan.getClass().getSimpleName());
+            }
+            return;
+        }
+        try {
+            LineageInfo lineageInfo = LineageUtils.buildLineageInfo(plan, 
currentHandleClass,
+                    executor.getContext(), executor);
+            if (lineageInfo != null) {
+                if (LOG.isDebugEnabled()) {
+                    LineageContext context = lineageInfo.getContext();
+                    LOG.debug("Submit lineage: queryId={}, plan={}",
+                            context == null ? "" : context.getQueryId(),
+                            plan.getClass().getSimpleName());
+                }
+                
Env.getCurrentEnv().getLineageEventProcessor().submitLineageEvent(lineageInfo);
+            }
+        } catch (Exception e) {
+            // Log and ignore exceptions during lineage processing to avoid 
impacting query execution
+            LOG.error("Failed to submit lineage event", e);
+        }
+    }
+
+    public static boolean shouldSkipLineage(Plan plan) {
+        return plan == null || isValuesOnly(plan) || 
isInternalSchemaTarget(plan);
+    }
+
+    private static boolean isValuesOnly(Plan plan) {
+        if (plan.containsType(LogicalCatalogRelation.class)) {
+            return false;
+        }
+        return plan.containsType(InlineTable.class, LogicalUnion.class, 
LogicalOneRowRelation.class);
+    }
+
+    private static boolean isInternalSchemaTarget(Plan plan) {
+        Optional<LogicalTableSink> sink = plan.collectFirst(node -> node 
instanceof LogicalTableSink);
+        if (!sink.isPresent()) {
+            return false;
+        }
+        TableIf targetTable = sink.get().getTargetTable();
+        if (targetTable == null || targetTable.getDatabase() == null
+                || targetTable.getDatabase().getCatalog() == null) {
+            return false;
+        }
+        String catalogName = targetTable.getDatabase().getCatalog().getName();
+        String dbName = targetTable.getDatabase().getFullName();
+        return 
InternalCatalog.INTERNAL_CATALOG_NAME.equalsIgnoreCase(catalogName)
+                && FeConstants.INTERNAL_DB_NAME.equalsIgnoreCase(dbName);
+    }
+
+    private static Map<String, Map<String, String>> 
collectExternalCatalogProperties(LineageInfo lineageInfo) {
+        if (lineageInfo == null) {
+            return Collections.emptyMap();
+        }
+        Set<TableIf> tableLineageSet = lineageInfo.getTableLineageSet();
+        TableIf targetTable = lineageInfo.getTargetTable();
+        int tableCount = (tableLineageSet == null ? 0 : 
tableLineageSet.size()) + (targetTable == null ? 0 : 1);
+        Set<TableIf> tables = new HashSet<>(Math.max(tableCount, 1));
+        if (tableLineageSet != null) {
+            tables.addAll(tableLineageSet);
+        }
+        if (targetTable != null) {
+            tables.add(targetTable);
+        }
+        if (tables.isEmpty()) {
+            return Collections.emptyMap();
+        }
+        Map<String, Map<String, String>> externalCatalogs = new 
LinkedHashMap<>();
+        for (TableIf table : tables) {
+            CatalogIf<?> catalog = getCatalog(table);
+            if (catalog == null) {
+                continue;
+            }
+            if (catalog.isInternalCatalog()) {
+                continue;
+            }
+            String catalogName = catalog.getName();
+            if (externalCatalogs.containsKey(catalogName)) {
+                continue;
+            }
+            Map<String, String> properties = new LinkedHashMap<>();
+            if (catalog.getType() != null) {
+                properties.put(CATALOG_TYPE_KEY, catalog.getType());
+            }
+            properties.putAll(sanitizeCatalogProperties(catalog));
+            externalCatalogs.put(catalogName, properties);
+        }
+        return externalCatalogs;
+    }
+
+    private static Map<String, String> sanitizeCatalogProperties(CatalogIf<?> 
catalog) {
+        if (catalog == null || catalog.getProperties() == null) {
+            return Collections.emptyMap();
+        }
+        Map<String, String> sanitized = new 
LinkedHashMap<>(catalog.getProperties().size());
+        if (catalog.getProperties().isEmpty()) {
+            return sanitized;
+        }
+        for (Map.Entry<String, String> entry : 
catalog.getProperties().entrySet()) {
+            String key = entry.getKey();
+            if (key == null) {
+                continue;
+            }
+            if (PrintableMap.HIDDEN_KEY.contains(key) || 
PrintableMap.SENSITIVE_KEY.contains(key)) {
+                continue;
+            }
+            if (catalog instanceof ExternalCatalog && 
ExternalCatalog.HIDDEN_PROPERTIES.contains(key)) {
+                continue;
+            }
+            sanitized.put(key, entry.getValue());
+        }
+        return sanitized;
+    }
+
+    private static LineageContext buildLineageContext(Class<? extends Command> 
sourceCommand, ConnectContext ctx,
+            StmtExecutor executor, long timestampMs, long durationMs) {
+        String queryId = ctx.queryId() == null ? EMPTY_STRING : 
DebugUtil.printId(ctx.queryId());
+        String queryText = executor == null ? EMPTY_STRING : 
executor.getOriginStmtInString();
+        Map<String, String> connectAttributes = ctx.getConnectAttributes();
+        String scheduleInfo = connectAttributes == null ? EMPTY_STRING : 
connectAttributes.get("scheduleInfo");
+        if (scheduleInfo != null && !scheduleInfo.isEmpty()) {
+            queryText = "/* scheduleInfo=" + scheduleInfo + " */ " + queryText;
+        }
+        String user = safeString(ctx.getQualifiedUser());
+        String database = safeString(ctx.getDatabase());
+        LineageContext lineageContext =
+                new LineageContext(sourceCommand, queryId, queryText, user, 
database, timestampMs, durationMs);
+        lineageContext.setClientIp(safeString(ctx.getClientIP()));
+        lineageContext.setState(ctx.getState() == null ? EMPTY_STRING : 
ctx.getState().getStateType().name());
+        return lineageContext;
+    }
+
+    private static boolean isLineagePluginConfigured() {
+        LineageEventProcessor processor = 
Env.getCurrentEnv().getLineageEventProcessor();
+        return processor != null && processor.hasActivePlugins();
+    }

Review Comment:
   **Issue #4 (Medium): `isLineagePluginConfigured()` iterates all plugins on 
every DML execution**
   
   `hasActivePlugins()` is called on every `INSERT INTO SELECT`, `INSERT 
OVERWRITE`, and `CTAS` execution. It iterates through all loaded plugins and 
calls `eventFilter()` on each one. For a system processing thousands of queries 
per second, this adds unnecessary overhead to the hot path.
   
   Consider caching the result with a short TTL (e.g., 5-10 seconds) or using a 
simple `volatile boolean` flag that is updated when plugins are refreshed or 
config changes:
   ```java
   private static boolean isLineagePluginConfigured() {
       LineageEventProcessor processor = 
Env.getCurrentEnv().getLineageEventProcessor();
       return processor != null && processor.hasActivePlugins(); // This 
iterates all plugins
   }
   ```
   
   Alternatively, since `activate_lineage_plugin` defaults to empty, a quick 
check of `Config.activate_lineage_plugin.length > 0` before calling 
`hasActivePlugins()` would be a cheap fast-path exit for the common case of no 
lineage plugins.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/lineage/LineagePlugin.java:
##########
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.lineage;
+
+import org.apache.doris.extension.spi.Plugin;
+
+/**
+ * SPI interface for lineage plugins.
+ * <p>
+ * Extends the generic {@link Plugin} from {@code fe-extension-spi}.
+ * Implementations receive {@link LineageInfo} events from the {@link 
LineageEventProcessor}
+ * and can filter or process them as needed.
+ * </p>
+ */
+public interface LineagePlugin extends Plugin {
+
+    /**
+     * Returns the unique name of this plugin, used for activation via
+     * {@code Config.activate_lineage_plugin}.
+     */
+    String name();
+
+    /**
+     * Returns {@code true} if this plugin should receive lineage events
+     * under the current configuration. Called before each event dispatch.
+     */
+    boolean eventFilter();
+
+    /**
+     * Processes a lineage event.

Review Comment:
   **Issue #5 (Low): SPI contract should document thread-safety requirement**
   
   `eventFilter()` and `exec()` are called from the single Worker thread, but 
`eventFilter()` is also called from arbitrary query threads (via 
`hasActivePlugins()` in `isLineagePluginConfigured()`). The SPI contract should 
document that:
   1. `eventFilter()` must be thread-safe (called from multiple threads 
concurrently)
   2. `exec()` is called from a single worker thread (but concurrent with 
`eventFilter()` calls)
   
   This helps plugin implementors avoid subtle concurrency bugs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to