maytasm commented on code in PR #19107: URL: https://github.com/apache/druid/pull/19107#discussion_r3024821410
########## extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLogger.java: ########## @@ -0,0 +1,498 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.extensions.openlineage; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.calcite.avatica.util.Casing; +import org.apache.calcite.avatica.util.Quoting; +import org.apache.calcite.sql.SqlBasicCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlInsert; +import org.apache.calcite.sql.SqlJoin; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOrderBy; +import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.SqlWith; +import org.apache.calcite.sql.SqlWithItem; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.server.RequestLogLine; +import org.apache.druid.server.log.RequestLogger; +import org.apache.druid.utils.CloseableUtils; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.util.EntityUtils; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * OpenLineage RunEvents for completed Druid queries. + */ +public class OpenLineageRequestLogger implements RequestLogger +{ + private static final Logger log = new Logger(OpenLineageRequestLogger.class); + + private static final String PRODUCER = + "https://github.com/apache/druid/extensions-contrib/openlineage-emitter"; + private static final String SCHEMA_URL = + "https://openlineage.io/spec/2-0-2/OpenLineage.json"; + private static final String ENGINE_FACET_SCHEMA_URL = + "https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json"; + private static final String ERROR_FACET_SCHEMA_URL = + "https://openlineage.io/spec/facets/1-0-0/ErrorMessageRunFacet.json"; + private static final String JOB_TYPE_FACET_SCHEMA_URL = + "https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json"; + private static final String SQL_FACET_SCHEMA_URL = + "https://openlineage.io/spec/facets/1-0-1/SQLJobFacet.json"; + + // Standard Calcite parser (not Druid's custom parser): sufficient for table name extraction. + // Druid-specific syntax (REPLACE, EXTERN, etc.) falls through to the SqlParseException handler. + private static final SqlParser.Config SQL_PARSER_CONFIG = SqlParser + .config() + .withCaseSensitive(true) + .withUnquotedCasing(Casing.UNCHANGED) + .withQuotedCasing(Casing.UNCHANGED) + .withQuoting(Quoting.DOUBLE_QUOTE); + + private static final int EMIT_QUEUE_CAPACITY = 1000; + private static final int EMIT_THREAD_COUNT = 1; + + /** + * Internal broker query types that are automatically issued for schema discovery and metadata + * caching. These are not user-initiated and produce noisy, low-value lineage events. + */ + private static final Set<String> INTERNAL_QUERY_TYPES = Set.of( + "segmentMetadata", + "dataSourceMetadata", + "timeBoundary" + ); + + private final ObjectMapper jsonMapper; + private final String namespace; + private final OpenLineageRequestLoggerProvider.TransportType transportType; + @Nullable + private final String transportUrl; + @Nullable + private final HttpClient httpClient; + @Nullable + private final ExecutorService emitExecutor; + + public OpenLineageRequestLogger( + ObjectMapper jsonMapper, + String namespace, + OpenLineageRequestLoggerProvider.TransportType transportType, + @Nullable String transportUrl + ) + { + this.jsonMapper = jsonMapper; + this.namespace = namespace; + this.transportType = transportType; + this.transportUrl = transportUrl; + if (transportType == OpenLineageRequestLoggerProvider.TransportType.HTTP) { + this.httpClient = HttpClientBuilder.create().build(); + // Bounded queue: if the queue is full, silently drop the event rather than blocking + // the query thread. Uses Druid's Execs for daemon thread naming conventions. + this.emitExecutor = new ThreadPoolExecutor( + EMIT_THREAD_COUNT, + EMIT_THREAD_COUNT, + 60L, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>(EMIT_QUEUE_CAPACITY), + Execs.makeThreadFactory("OpenLineageEmitter-%d"), + new ThreadPoolExecutor.DiscardPolicy() + ); + } else { + this.httpClient = null; + this.emitExecutor = null; + } + } + + @LifecycleStart + @Override + public void start() throws Exception + { + if (transportType == OpenLineageRequestLoggerProvider.TransportType.HTTP && transportUrl == null) { + throw new IllegalStateException( + "druid.request.logging.transportUrl must be set when transportType=HTTP" + ); + } + if (transportType == OpenLineageRequestLoggerProvider.TransportType.HTTP) { + log.info("Started OpenLineage HTTP transport to [%s]", transportUrl); + } else { + log.info("Started OpenLineage console transport"); + } + } + + @LifecycleStop + @Override + public void stop() + { + if (emitExecutor != null) { + emitExecutor.shutdown(); + try { + if (!emitExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + emitExecutor.shutdownNow(); + } + } + catch (InterruptedException e) { + emitExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + if (httpClient instanceof Closeable) { + CloseableUtils.closeAndSuppressExceptions( + (Closeable) httpClient, + e -> log.warn(e, "Failed to close OpenLineage HTTP client") + ); + } + log.info("Stopped OpenLineage request logger"); + } + + @Override + public void logNativeQuery(RequestLogLine requestLogLine) throws IOException + { + if (requestLogLine.getQuery() == null) { + return; + } + + // Skip native sub-queries of a SQL execution to avoid duplicating the SQL-level event. + if (requestLogLine.getQuery().getContext().get(BaseQuery.SQL_QUERY_ID) != null) { + return; + } + + String queryType = requestLogLine.getQuery().getType(); + + if (INTERNAL_QUERY_TYPES.contains(queryType)) { + return; + } + + List<String> inputs = new ArrayList<>(new LinkedHashSet<>(requestLogLine.getQuery().getDataSource().getTableNames())); + String queryId = requestLogLine.getQuery().getId(); + if (queryId == null) { + queryId = UUID.randomUUID().toString(); + } + + emit(buildRunEvent(queryId, null, queryType, requestLogLine, inputs, null)); + } + + @Override + public void logSqlQuery(RequestLogLine requestLogLine) throws IOException + { + String sql = requestLogLine.getSql(); + List<String> inputs = new ArrayList<>(); + String output = null; + + if (sql != null) { + try { + SqlNode parsed = SqlParser.create(sql, SQL_PARSER_CONFIG).parseQuery(); + inputs = extractInputs(parsed); + output = extractOutput(parsed); + } + catch (SqlParseException e) { + // Druid-specific SQL extensions (REPLACE, EXTERN, etc.) may not parse with the standard + // Calcite parser. Emit the event without table-level lineage rather than failing. + log.debug( + "OpenLineage: could not parse SQL for lineage extraction (query will still be emitted): %s", + e.getMessage() + ); + } + } + + String queryId = extractSqlQueryId(requestLogLine); + emit(buildRunEvent(queryId, sql, "sql", requestLogLine, new ArrayList<>(new LinkedHashSet<>(inputs)), output)); + } + + private ObjectNode buildRunEvent( + String queryId, + @Nullable String sql, + String queryType, + RequestLogLine requestLogLine, + List<String> inputs, + @Nullable String output + ) + { + Map<String, Object> stats = requestLogLine.getQueryStats().getStats(); + boolean success = Boolean.TRUE.equals(stats.get("success")); + + ObjectNode event = jsonMapper.createObjectNode(); + event.put("eventType", success ? "COMPLETE" : "FAIL"); + event.put("eventTime", requestLogLine.getTimestamp().toInstant().toString()); + event.put("producer", PRODUCER); + event.put("schemaURL", SCHEMA_URL); + event.set("run", buildRun(queryId, queryType, requestLogLine, stats, success)); + event.set("job", buildJob(queryId, sql)); + event.set("inputs", buildDatasets(inputs)); + event.set("outputs", buildDatasets(output != null ? List.of(output) : List.of())); + return event; + } + + private ObjectNode buildRun( + String queryId, + String queryType, + RequestLogLine requestLogLine, + Map<String, Object> stats, + boolean success + ) + { + ObjectNode run = jsonMapper.createObjectNode(); + run.put("runId", UUID.nameUUIDFromBytes(queryId.getBytes(StandardCharsets.UTF_8)).toString()); + + ObjectNode facets = jsonMapper.createObjectNode(); + + ObjectNode engineFacet = createFacet(ENGINE_FACET_SCHEMA_URL); + engineFacet.put("name", "druid"); + engineFacet.put("version", getDruidVersion()); + facets.set("processing_engine", engineFacet); + + ObjectNode contextFacet = createFacet(null); + contextFacet.put("queryType", queryType); + contextFacet.put("remoteAddress", requestLogLine.getRemoteAddr()); + Object identity = stats.get("identity"); + if (identity != null) { + contextFacet.put("identity", identity.toString()); + } + Object nativeQueryIds = requestLogLine.getSqlQueryContext().get("nativeQueryIds"); + if (nativeQueryIds != null) { + contextFacet.put("nativeQueryIds", nativeQueryIds.toString()); + } + facets.set("druid_query_context", contextFacet); + + ObjectNode statsFacet = createFacet(null); + putLongStat(statsFacet, "durationMs", stats, "sqlQuery/time", "query/time"); + putLongStat(statsFacet, "bytes", stats, "sqlQuery/bytes", "query/bytes"); + putLongStat(statsFacet, "planningTimeMs", stats, "sqlQuery/planningTimeMs"); + Object statusCode = stats.get("statusCode"); + if (statusCode != null) { + statsFacet.put("statusCode", statusCode.toString()); + } + facets.set("druid_query_statistics", statsFacet); + + if (!success) { + Object exception = stats.get("exception"); + if (exception != null) { + ObjectNode errorFacet = createFacet(ERROR_FACET_SCHEMA_URL); + errorFacet.put("message", exception.toString()); + if ("sql".equals(queryType)) { + errorFacet.put("programmingLanguage", "SQL"); + } + facets.set("errorMessage", errorFacet); + } + } + + run.set("facets", facets); + return run; + } + + private ObjectNode buildJob(String queryId, @Nullable String sql) + { + ObjectNode job = jsonMapper.createObjectNode(); + job.put("namespace", namespace); + job.put("name", queryId); + + ObjectNode facets = jsonMapper.createObjectNode(); + + ObjectNode jobTypeFacet = createFacet(JOB_TYPE_FACET_SCHEMA_URL); + jobTypeFacet.put("processingType", "BATCH"); + jobTypeFacet.put("integration", "DRUID"); + jobTypeFacet.put("jobType", "QUERY"); + facets.set("jobType", jobTypeFacet); + + if (sql != null) { Review Comment: Does it make sense to include native query if this is not sql? ########## docs/development/extensions-contrib/openlineage-emitter.md: ########## @@ -0,0 +1,95 @@ +--- +id: openlineage-emitter +title: "OpenLineage Emitter" +--- + +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +To use this Apache Druid extension, [include](../../configuration/extensions.md#loading-extensions) `openlineage-emitter` in the extensions load list. + +## Introduction + +This extension emits [OpenLineage](https://openlineage.io) `RunEvent`s for each completed Druid query, enabling data lineage tracking with any OpenLineage-compatible backend such as [Marquez](https://marquezproject.ai). + +For SQL queries, the SQL text is parsed to extract input datasources (FROM clauses, JOINs, CTEs) and the output datasource (INSERT INTO). For native queries, table names are resolved from the datasource tree. Native sub-queries spawned by a SQL execution are deduplicated against the SQL-level event. + +:::note +SQL table extraction relies on `calcite-core` being on the classpath, which is the case on Broker nodes. Native query lineage is available on all nodes. +::: + +## Configuration + +All configuration parameters are under `druid.request.logging`. + +| Property | Description | Required | Default | +|---|---|---|---| +| `druid.request.logging.type` | Set to `openlineage` to enable this extension. | yes | — | +| `druid.request.logging.namespace` | Namespace used for OpenLineage job and dataset URIs. Typically the Broker URL. | no | `druid://localhost` | +| `druid.request.logging.transportType` | Where to send events. `CONSOLE` logs JSON to the Druid log; `HTTP` POSTs to an OpenLineage API endpoint. | no | `CONSOLE` | +| `druid.request.logging.transportUrl` | OpenLineage API endpoint URL. Required when `transportType=HTTP`. | no | — | + +### Examples + +**Console (development)** + +```properties +druid.request.logging.type=openlineage +druid.request.logging.namespace=druid://broker.prod:8082 +``` + +**HTTP (production)** + +```properties +druid.request.logging.type=openlineage +druid.request.logging.namespace=druid://broker.prod:8082 +druid.request.logging.transportType=HTTP +druid.request.logging.transportUrl=http://marquez:5000/api/v1/lineage +``` + +**Combined with another logger using the `composing` provider** + +```properties +druid.request.logging.type=composing +druid.request.logging.loggerProviders[0].type=slf4j Review Comment: Are these correct? Don't you have to set druid.request.logging.loggerProviders to a List with a map for each Logger? ########## docs/development/extensions-contrib/openlineage-emitter.md: ########## @@ -0,0 +1,95 @@ +--- +id: openlineage-emitter +title: "OpenLineage Emitter" +--- + +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +To use this Apache Druid extension, [include](../../configuration/extensions.md#loading-extensions) `openlineage-emitter` in the extensions load list. + +## Introduction + +This extension emits [OpenLineage](https://openlineage.io) `RunEvent`s for each completed Druid query, enabling data lineage tracking with any OpenLineage-compatible backend such as [Marquez](https://marquezproject.ai). + +For SQL queries, the SQL text is parsed to extract input datasources (FROM clauses, JOINs, CTEs) and the output datasource (INSERT INTO). For native queries, table names are resolved from the datasource tree. Native sub-queries spawned by a SQL execution are deduplicated against the SQL-level event. + +:::note +SQL table extraction relies on `calcite-core` being on the classpath, which is the case on Broker nodes. Native query lineage is available on all nodes. +::: + +## Configuration + +All configuration parameters are under `druid.request.logging`. + +| Property | Description | Required | Default | +|---|---|---|---| +| `druid.request.logging.type` | Set to `openlineage` to enable this extension. | yes | — | +| `druid.request.logging.namespace` | Namespace used for OpenLineage job and dataset URIs. Typically the Broker URL. | no | `druid://localhost` | Review Comment: Can this default to the hostname? ########## extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLogger.java: ########## @@ -0,0 +1,498 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.extensions.openlineage; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.calcite.avatica.util.Casing; +import org.apache.calcite.avatica.util.Quoting; +import org.apache.calcite.sql.SqlBasicCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlInsert; +import org.apache.calcite.sql.SqlJoin; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOrderBy; +import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.SqlWith; +import org.apache.calcite.sql.SqlWithItem; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.server.RequestLogLine; +import org.apache.druid.server.log.RequestLogger; +import org.apache.druid.utils.CloseableUtils; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.util.EntityUtils; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * OpenLineage RunEvents for completed Druid queries. + */ +public class OpenLineageRequestLogger implements RequestLogger +{ + private static final Logger log = new Logger(OpenLineageRequestLogger.class); + + private static final String PRODUCER = + "https://github.com/apache/druid/extensions-contrib/openlineage-emitter"; + private static final String SCHEMA_URL = + "https://openlineage.io/spec/2-0-2/OpenLineage.json"; + private static final String ENGINE_FACET_SCHEMA_URL = + "https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json"; + private static final String ERROR_FACET_SCHEMA_URL = + "https://openlineage.io/spec/facets/1-0-0/ErrorMessageRunFacet.json"; + private static final String JOB_TYPE_FACET_SCHEMA_URL = + "https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json"; + private static final String SQL_FACET_SCHEMA_URL = + "https://openlineage.io/spec/facets/1-0-1/SQLJobFacet.json"; + + // Standard Calcite parser (not Druid's custom parser): sufficient for table name extraction. + // Druid-specific syntax (REPLACE, EXTERN, etc.) falls through to the SqlParseException handler. + private static final SqlParser.Config SQL_PARSER_CONFIG = SqlParser + .config() + .withCaseSensitive(true) + .withUnquotedCasing(Casing.UNCHANGED) + .withQuotedCasing(Casing.UNCHANGED) + .withQuoting(Quoting.DOUBLE_QUOTE); + + private static final int EMIT_QUEUE_CAPACITY = 1000; + private static final int EMIT_THREAD_COUNT = 1; + + /** + * Internal broker query types that are automatically issued for schema discovery and metadata + * caching. These are not user-initiated and produce noisy, low-value lineage events. + */ + private static final Set<String> INTERNAL_QUERY_TYPES = Set.of( + "segmentMetadata", + "dataSourceMetadata", + "timeBoundary" + ); + + private final ObjectMapper jsonMapper; + private final String namespace; + private final OpenLineageRequestLoggerProvider.TransportType transportType; + @Nullable + private final String transportUrl; + @Nullable + private final HttpClient httpClient; + @Nullable + private final ExecutorService emitExecutor; + + public OpenLineageRequestLogger( + ObjectMapper jsonMapper, + String namespace, + OpenLineageRequestLoggerProvider.TransportType transportType, + @Nullable String transportUrl + ) + { + this.jsonMapper = jsonMapper; + this.namespace = namespace; + this.transportType = transportType; + this.transportUrl = transportUrl; + if (transportType == OpenLineageRequestLoggerProvider.TransportType.HTTP) { + this.httpClient = HttpClientBuilder.create().build(); Review Comment: We may want to be able to override this client in case we require some specific http client config (keystore, truststore, etc) ########## extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLogger.java: ########## @@ -0,0 +1,498 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.extensions.openlineage; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.calcite.avatica.util.Casing; +import org.apache.calcite.avatica.util.Quoting; +import org.apache.calcite.sql.SqlBasicCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlInsert; +import org.apache.calcite.sql.SqlJoin; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOrderBy; +import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.SqlWith; +import org.apache.calcite.sql.SqlWithItem; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.server.RequestLogLine; +import org.apache.druid.server.log.RequestLogger; +import org.apache.druid.utils.CloseableUtils; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.util.EntityUtils; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * OpenLineage RunEvents for completed Druid queries. + */ +public class OpenLineageRequestLogger implements RequestLogger +{ + private static final Logger log = new Logger(OpenLineageRequestLogger.class); + + private static final String PRODUCER = + "https://github.com/apache/druid/extensions-contrib/openlineage-emitter"; + private static final String SCHEMA_URL = + "https://openlineage.io/spec/2-0-2/OpenLineage.json"; + private static final String ENGINE_FACET_SCHEMA_URL = + "https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json"; + private static final String ERROR_FACET_SCHEMA_URL = + "https://openlineage.io/spec/facets/1-0-0/ErrorMessageRunFacet.json"; + private static final String JOB_TYPE_FACET_SCHEMA_URL = + "https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json"; + private static final String SQL_FACET_SCHEMA_URL = + "https://openlineage.io/spec/facets/1-0-1/SQLJobFacet.json"; + + // Standard Calcite parser (not Druid's custom parser): sufficient for table name extraction. + // Druid-specific syntax (REPLACE, EXTERN, etc.) falls through to the SqlParseException handler. + private static final SqlParser.Config SQL_PARSER_CONFIG = SqlParser + .config() + .withCaseSensitive(true) + .withUnquotedCasing(Casing.UNCHANGED) + .withQuotedCasing(Casing.UNCHANGED) + .withQuoting(Quoting.DOUBLE_QUOTE); + + private static final int EMIT_QUEUE_CAPACITY = 1000; + private static final int EMIT_THREAD_COUNT = 1; + + /** + * Internal broker query types that are automatically issued for schema discovery and metadata + * caching. These are not user-initiated and produce noisy, low-value lineage events. + */ + private static final Set<String> INTERNAL_QUERY_TYPES = Set.of( + "segmentMetadata", + "dataSourceMetadata", + "timeBoundary" + ); + + private final ObjectMapper jsonMapper; + private final String namespace; + private final OpenLineageRequestLoggerProvider.TransportType transportType; + @Nullable + private final String transportUrl; + @Nullable + private final HttpClient httpClient; + @Nullable + private final ExecutorService emitExecutor; + + public OpenLineageRequestLogger( + ObjectMapper jsonMapper, + String namespace, + OpenLineageRequestLoggerProvider.TransportType transportType, + @Nullable String transportUrl + ) + { + this.jsonMapper = jsonMapper; + this.namespace = namespace; + this.transportType = transportType; + this.transportUrl = transportUrl; + if (transportType == OpenLineageRequestLoggerProvider.TransportType.HTTP) { + this.httpClient = HttpClientBuilder.create().build(); + // Bounded queue: if the queue is full, silently drop the event rather than blocking + // the query thread. Uses Druid's Execs for daemon thread naming conventions. + this.emitExecutor = new ThreadPoolExecutor( + EMIT_THREAD_COUNT, + EMIT_THREAD_COUNT, + 60L, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>(EMIT_QUEUE_CAPACITY), + Execs.makeThreadFactory("OpenLineageEmitter-%d"), + new ThreadPoolExecutor.DiscardPolicy() + ); + } else { + this.httpClient = null; + this.emitExecutor = null; + } + } + + @LifecycleStart + @Override + public void start() throws Exception + { + if (transportType == OpenLineageRequestLoggerProvider.TransportType.HTTP && transportUrl == null) { + throw new IllegalStateException( + "druid.request.logging.transportUrl must be set when transportType=HTTP" + ); + } + if (transportType == OpenLineageRequestLoggerProvider.TransportType.HTTP) { + log.info("Started OpenLineage HTTP transport to [%s]", transportUrl); + } else { + log.info("Started OpenLineage console transport"); + } + } + + @LifecycleStop + @Override + public void stop() + { + if (emitExecutor != null) { + emitExecutor.shutdown(); + try { + if (!emitExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + emitExecutor.shutdownNow(); + } + } + catch (InterruptedException e) { + emitExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + if (httpClient instanceof Closeable) { + CloseableUtils.closeAndSuppressExceptions( + (Closeable) httpClient, + e -> log.warn(e, "Failed to close OpenLineage HTTP client") + ); + } + log.info("Stopped OpenLineage request logger"); + } + + @Override + public void logNativeQuery(RequestLogLine requestLogLine) throws IOException + { + if (requestLogLine.getQuery() == null) { + return; + } + + // Skip native sub-queries of a SQL execution to avoid duplicating the SQL-level event. + if (requestLogLine.getQuery().getContext().get(BaseQuery.SQL_QUERY_ID) != null) { + return; + } + + String queryType = requestLogLine.getQuery().getType(); + + if (INTERNAL_QUERY_TYPES.contains(queryType)) { + return; + } + + List<String> inputs = new ArrayList<>(new LinkedHashSet<>(requestLogLine.getQuery().getDataSource().getTableNames())); + String queryId = requestLogLine.getQuery().getId(); + if (queryId == null) { + queryId = UUID.randomUUID().toString(); + } + + emit(buildRunEvent(queryId, null, queryType, requestLogLine, inputs, null)); + } + + @Override + public void logSqlQuery(RequestLogLine requestLogLine) throws IOException + { + String sql = requestLogLine.getSql(); + List<String> inputs = new ArrayList<>(); + String output = null; + + if (sql != null) { + try { + SqlNode parsed = SqlParser.create(sql, SQL_PARSER_CONFIG).parseQuery(); + inputs = extractInputs(parsed); + output = extractOutput(parsed); + } + catch (SqlParseException e) { + // Druid-specific SQL extensions (REPLACE, EXTERN, etc.) may not parse with the standard + // Calcite parser. Emit the event without table-level lineage rather than failing. + log.debug( + "OpenLineage: could not parse SQL for lineage extraction (query will still be emitted): %s", + e.getMessage() + ); + } + } + + String queryId = extractSqlQueryId(requestLogLine); + emit(buildRunEvent(queryId, sql, "sql", requestLogLine, new ArrayList<>(new LinkedHashSet<>(inputs)), output)); + } + + private ObjectNode buildRunEvent( + String queryId, + @Nullable String sql, + String queryType, + RequestLogLine requestLogLine, + List<String> inputs, + @Nullable String output + ) + { + Map<String, Object> stats = requestLogLine.getQueryStats().getStats(); + boolean success = Boolean.TRUE.equals(stats.get("success")); + + ObjectNode event = jsonMapper.createObjectNode(); + event.put("eventType", success ? "COMPLETE" : "FAIL"); + event.put("eventTime", requestLogLine.getTimestamp().toInstant().toString()); + event.put("producer", PRODUCER); + event.put("schemaURL", SCHEMA_URL); + event.set("run", buildRun(queryId, queryType, requestLogLine, stats, success)); + event.set("job", buildJob(queryId, sql)); + event.set("inputs", buildDatasets(inputs)); + event.set("outputs", buildDatasets(output != null ? List.of(output) : List.of())); + return event; + } + + private ObjectNode buildRun( + String queryId, + String queryType, + RequestLogLine requestLogLine, + Map<String, Object> stats, + boolean success + ) + { + ObjectNode run = jsonMapper.createObjectNode(); + run.put("runId", UUID.nameUUIDFromBytes(queryId.getBytes(StandardCharsets.UTF_8)).toString()); + + ObjectNode facets = jsonMapper.createObjectNode(); + + ObjectNode engineFacet = createFacet(ENGINE_FACET_SCHEMA_URL); + engineFacet.put("name", "druid"); + engineFacet.put("version", getDruidVersion()); + facets.set("processing_engine", engineFacet); + + ObjectNode contextFacet = createFacet(null); + contextFacet.put("queryType", queryType); + contextFacet.put("remoteAddress", requestLogLine.getRemoteAddr()); + Object identity = stats.get("identity"); + if (identity != null) { + contextFacet.put("identity", identity.toString()); + } + Object nativeQueryIds = requestLogLine.getSqlQueryContext().get("nativeQueryIds"); + if (nativeQueryIds != null) { + contextFacet.put("nativeQueryIds", nativeQueryIds.toString()); + } + facets.set("druid_query_context", contextFacet); + + ObjectNode statsFacet = createFacet(null); + putLongStat(statsFacet, "durationMs", stats, "sqlQuery/time", "query/time"); + putLongStat(statsFacet, "bytes", stats, "sqlQuery/bytes", "query/bytes"); + putLongStat(statsFacet, "planningTimeMs", stats, "sqlQuery/planningTimeMs"); + Object statusCode = stats.get("statusCode"); + if (statusCode != null) { + statsFacet.put("statusCode", statusCode.toString()); + } + facets.set("druid_query_statistics", statsFacet); + + if (!success) { + Object exception = stats.get("exception"); + if (exception != null) { + ObjectNode errorFacet = createFacet(ERROR_FACET_SCHEMA_URL); + errorFacet.put("message", exception.toString()); + if ("sql".equals(queryType)) { + errorFacet.put("programmingLanguage", "SQL"); + } + facets.set("errorMessage", errorFacet); + } + } + + run.set("facets", facets); + return run; + } + + private ObjectNode buildJob(String queryId, @Nullable String sql) + { + ObjectNode job = jsonMapper.createObjectNode(); + job.put("namespace", namespace); + job.put("name", queryId); + + ObjectNode facets = jsonMapper.createObjectNode(); + + ObjectNode jobTypeFacet = createFacet(JOB_TYPE_FACET_SCHEMA_URL); + jobTypeFacet.put("processingType", "BATCH"); + jobTypeFacet.put("integration", "DRUID"); + jobTypeFacet.put("jobType", "QUERY"); + facets.set("jobType", jobTypeFacet); + + if (sql != null) { + ObjectNode sqlFacet = createFacet(SQL_FACET_SCHEMA_URL); + sqlFacet.put("query", sql); + facets.set("sql", sqlFacet); + } + + job.set("facets", facets); + return job; + } + + private ObjectNode createFacet(@Nullable String schemaUrl) + { + ObjectNode facet = jsonMapper.createObjectNode(); + facet.put("_producer", PRODUCER); + if (schemaUrl != null) { + facet.put("_schemaURL", schemaUrl); + } + return facet; + } + + private ArrayNode buildDatasets(List<String> tableNames) + { + ArrayNode array = jsonMapper.createArrayNode(); + for (String name : tableNames) { + ObjectNode node = jsonMapper.createObjectNode(); + node.put("namespace", namespace); + node.put("name", name); + node.set("facets", jsonMapper.createObjectNode()); Review Comment: Can we create a Github issue to track work to include the fields? ########## extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLogger.java: ########## @@ -0,0 +1,498 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.extensions.openlineage; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.calcite.avatica.util.Casing; +import org.apache.calcite.avatica.util.Quoting; +import org.apache.calcite.sql.SqlBasicCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlInsert; +import org.apache.calcite.sql.SqlJoin; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOrderBy; +import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.SqlWith; +import org.apache.calcite.sql.SqlWithItem; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.server.RequestLogLine; +import org.apache.druid.server.log.RequestLogger; +import org.apache.druid.utils.CloseableUtils; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.util.EntityUtils; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * OpenLineage RunEvents for completed Druid queries. + */ +public class OpenLineageRequestLogger implements RequestLogger +{ + private static final Logger log = new Logger(OpenLineageRequestLogger.class); + + private static final String PRODUCER = + "https://github.com/apache/druid/extensions-contrib/openlineage-emitter"; + private static final String SCHEMA_URL = + "https://openlineage.io/spec/2-0-2/OpenLineage.json"; + private static final String ENGINE_FACET_SCHEMA_URL = + "https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json"; + private static final String ERROR_FACET_SCHEMA_URL = + "https://openlineage.io/spec/facets/1-0-0/ErrorMessageRunFacet.json"; + private static final String JOB_TYPE_FACET_SCHEMA_URL = + "https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json"; + private static final String SQL_FACET_SCHEMA_URL = + "https://openlineage.io/spec/facets/1-0-1/SQLJobFacet.json"; + + // Standard Calcite parser (not Druid's custom parser): sufficient for table name extraction. + // Druid-specific syntax (REPLACE, EXTERN, etc.) falls through to the SqlParseException handler. + private static final SqlParser.Config SQL_PARSER_CONFIG = SqlParser + .config() + .withCaseSensitive(true) + .withUnquotedCasing(Casing.UNCHANGED) + .withQuotedCasing(Casing.UNCHANGED) + .withQuoting(Quoting.DOUBLE_QUOTE); + + private static final int EMIT_QUEUE_CAPACITY = 1000; + private static final int EMIT_THREAD_COUNT = 1; + + /** + * Internal broker query types that are automatically issued for schema discovery and metadata + * caching. These are not user-initiated and produce noisy, low-value lineage events. + */ + private static final Set<String> INTERNAL_QUERY_TYPES = Set.of( Review Comment: Maybe make this configurable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
