seawinde commented on code in PR #60122: URL: https://github.com/apache/doris/pull/60122#discussion_r2727272780
########## fe/fe-core/src/main/java/org/apache/doris/nereids/lineage/LineageUtils.java: ########## @@ -0,0 +1,263 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.lineage; + +import org.apache.doris.analysis.StatementBase; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.common.util.PrintableMap; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.nereids.glue.LogicalPlanAdapter; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.algebra.InlineTable; +import org.apache.doris.nereids.trees.plans.commands.Command; +import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation; +import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.LogicalTableSink; +import org.apache.doris.nereids.trees.plans.logical.LogicalUnion; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * Utility methods for lineage event construction and filtering. + */ +public final class LineageUtils { + + public static final Logger LOG = LogManager.getLogger(LineageUtils.class); + private static final String EMPTY_STRING = ""; + private static final String CATALOG_TYPE_KEY = "type"; + private static final int NO_PLUGINS = 0; + private static final long UNKNOWN_START_TIME_MS = 0L; + private static final long UNKNOWN_DURATION_MS = 0L; + + private LineageUtils() { + } + + /** + * Check whether the parsed statement matches the current command type. + * + * @param executor statement executor containing parsed statement + * @param currentCommand current command class + * @return true if parsed command matches current command + */ + public static boolean isSameParsedCommand(StmtExecutor executor, Class<? extends Command> currentCommand) { + if (executor == null || currentCommand == null) { + return false; + } + StatementBase parsedStmt = executor.getParsedStmt(); + if (!(parsedStmt instanceof LogicalPlanAdapter)) { + return false; + } + Plan parsedPlan = ((LogicalPlanAdapter) parsedStmt).getLogicalPlan(); + if (!(parsedPlan instanceof Command)) { + return false; + } + return parsedPlan.getClass().equals(currentCommand); + } + + /** + * Build a lineage event and compute lineage info if lineage plugins are enabled. + * + * @param plan the plan to extract lineage from + * @param sourceCommand the command type for the event + * @param ctx connect context holding query metadata + * @param executor statement executor for query text + */ + public static LineageEvent buildLineageEvent(Plan plan, Class<? extends Command> sourceCommand, + ConnectContext ctx, StmtExecutor executor) { + if (plan == null || ctx == null) { + return null; + } + LineageInfo lineageInfo = LineageInfoExtractor.extractLineageInfo(plan); + LineageContext context = buildLineageContext(sourceCommand, ctx, executor); + String catalog = safeString(ctx.getDefaultCatalog()); + context.setCatalog(catalog); + context.setExternalCatalogProperties(collectExternalCatalogProperties(lineageInfo)); + lineageInfo.setContext(context); + return new LineageEvent(lineageInfo); + } + + /** + * Submit lineage event if lineage plugins are enabled and command matches parsed statement. + * + * @param executor statement executor containing parsed statement + * @param lineagePlan optional lineage plan to use instead of current plan + * @param currentPlan current logical plan + * @param currentHandleClass current command class + */ + public static void submitLineageEventIfNeeded(StmtExecutor executor, Optional<Plan> lineagePlan, + LogicalPlan currentPlan, + Class<? extends Command> currentHandleClass) { + if (!LineageUtils.isSameParsedCommand(executor, currentHandleClass)) { + return; + } + if (!isLineagePluginConfigured()) { + return; + } + Plan plan = lineagePlan.orElse(currentPlan); + if (shouldSkipLineage(plan)) { + return; + } + try { + LineageEvent lineageEvent = LineageUtils.buildLineageEvent(plan, currentHandleClass, + executor.getContext(), executor); + if (lineageEvent != null) { + Env.getCurrentEnv().getLineageEventProcessor().submitLineageEvent(lineageEvent); + } + } catch (Exception e) { + // Log and ignore exceptions during lineage processing to avoid impacting query execution + LOG.error("Failed to submit lineage event", e); + } + } + + public static boolean shouldSkipLineage(Plan plan) { + return isValuesOnly(plan) || isInternalSchemaTarget(plan); + } + + private static boolean isValuesOnly(Plan plan) { + if (plan.containsType(LogicalCatalogRelation.class)) { + return false; + } + return plan.containsType(InlineTable.class, LogicalUnion.class, LogicalOneRowRelation.class); + } + + private static boolean isInternalSchemaTarget(Plan plan) { + Optional<LogicalTableSink> sink = plan.collectFirst(node -> node instanceof LogicalTableSink); + if (!sink.isPresent()) { + return false; + } + TableIf targetTable = sink.get().getTargetTable(); + if (targetTable == null || targetTable.getDatabase() == null + || targetTable.getDatabase().getCatalog() == null) { + return false; + } + String catalogName = targetTable.getDatabase().getCatalog().getName(); + String dbName = targetTable.getDatabase().getFullName(); + return InternalCatalog.INTERNAL_CATALOG_NAME.equalsIgnoreCase(catalogName) + && FeConstants.INTERNAL_DB_NAME.equalsIgnoreCase(dbName); + } + + private static Map<String, Map<String, String>> collectExternalCatalogProperties(LineageInfo lineageInfo) { Review Comment: because some lineage plugin need external catalog property such as uri and so on -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
