maytasm commented on code in PR #19107:
URL: https://github.com/apache/druid/pull/19107#discussion_r3024821410


##########
extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLogger.java:
##########
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.extensions.openlineage;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.calcite.avatica.util.Casing;
+import org.apache.calcite.avatica.util.Quoting;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlJoin;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.SqlWithItem;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.server.RequestLogLine;
+import org.apache.druid.server.log.RequestLogger;
+import org.apache.druid.utils.CloseableUtils;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * OpenLineage RunEvents for completed Druid queries.
+ */
+public class OpenLineageRequestLogger implements RequestLogger
+{
+  private static final Logger log = new Logger(OpenLineageRequestLogger.class);
+
+  private static final String PRODUCER =
+      "https://github.com/apache/druid/extensions-contrib/openlineage-emitter";;
+  private static final String SCHEMA_URL =
+      "https://openlineage.io/spec/2-0-2/OpenLineage.json";;
+  private static final String ENGINE_FACET_SCHEMA_URL =
+      "https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json";;
+  private static final String ERROR_FACET_SCHEMA_URL =
+      "https://openlineage.io/spec/facets/1-0-0/ErrorMessageRunFacet.json";;
+  private static final String JOB_TYPE_FACET_SCHEMA_URL =
+      "https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json";;
+  private static final String SQL_FACET_SCHEMA_URL =
+      "https://openlineage.io/spec/facets/1-0-1/SQLJobFacet.json";;
+
+  // Standard Calcite parser (not Druid's custom parser): sufficient for table 
name extraction.
+  // Druid-specific syntax (REPLACE, EXTERN, etc.) falls through to the 
SqlParseException handler.
+  private static final SqlParser.Config SQL_PARSER_CONFIG = SqlParser
+      .config()
+      .withCaseSensitive(true)
+      .withUnquotedCasing(Casing.UNCHANGED)
+      .withQuotedCasing(Casing.UNCHANGED)
+      .withQuoting(Quoting.DOUBLE_QUOTE);
+
+  private static final int EMIT_QUEUE_CAPACITY = 1000;
+  private static final int EMIT_THREAD_COUNT = 1;
+
+  /**
+   * Internal broker query types that are automatically issued for schema 
discovery and metadata
+   * caching. These are not user-initiated and produce noisy, low-value 
lineage events.
+   */
+  private static final Set<String> INTERNAL_QUERY_TYPES = Set.of(
+      "segmentMetadata",
+      "dataSourceMetadata",
+      "timeBoundary"
+  );
+
+  private final ObjectMapper jsonMapper;
+  private final String namespace;
+  private final OpenLineageRequestLoggerProvider.TransportType transportType;
+  @Nullable
+  private final String transportUrl;
+  @Nullable
+  private final HttpClient httpClient;
+  @Nullable
+  private final ExecutorService emitExecutor;
+
+  public OpenLineageRequestLogger(
+      ObjectMapper jsonMapper,
+      String namespace,
+      OpenLineageRequestLoggerProvider.TransportType transportType,
+      @Nullable String transportUrl
+  )
+  {
+    this.jsonMapper = jsonMapper;
+    this.namespace = namespace;
+    this.transportType = transportType;
+    this.transportUrl = transportUrl;
+    if (transportType == OpenLineageRequestLoggerProvider.TransportType.HTTP) {
+      this.httpClient = HttpClientBuilder.create().build();
+      // Bounded queue: if the queue is full, silently drop the event rather 
than blocking
+      // the query thread. Uses Druid's Execs for daemon thread naming 
conventions.
+      this.emitExecutor = new ThreadPoolExecutor(
+          EMIT_THREAD_COUNT,
+          EMIT_THREAD_COUNT,
+          60L,
+          TimeUnit.SECONDS,
+          new ArrayBlockingQueue<>(EMIT_QUEUE_CAPACITY),
+          Execs.makeThreadFactory("OpenLineageEmitter-%d"),
+          new ThreadPoolExecutor.DiscardPolicy()
+      );
+    } else {
+      this.httpClient = null;
+      this.emitExecutor = null;
+    }
+  }
+
+  @LifecycleStart
+  @Override
+  public void start() throws Exception
+  {
+    if (transportType == OpenLineageRequestLoggerProvider.TransportType.HTTP 
&& transportUrl == null) {
+      throw new IllegalStateException(
+          "druid.request.logging.transportUrl must be set when 
transportType=HTTP"
+      );
+    }
+    if (transportType == OpenLineageRequestLoggerProvider.TransportType.HTTP) {
+      log.info("Started OpenLineage HTTP transport to [%s]", transportUrl);
+    } else {
+      log.info("Started OpenLineage console transport");
+    }
+  }
+
+  @LifecycleStop
+  @Override
+  public void stop()
+  {
+    if (emitExecutor != null) {
+      emitExecutor.shutdown();
+      try {
+        if (!emitExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          emitExecutor.shutdownNow();
+        }
+      }
+      catch (InterruptedException e) {
+        emitExecutor.shutdownNow();
+        Thread.currentThread().interrupt();
+      }
+    }
+    if (httpClient instanceof Closeable) {
+      CloseableUtils.closeAndSuppressExceptions(
+          (Closeable) httpClient,
+          e -> log.warn(e, "Failed to close OpenLineage HTTP client")
+      );
+    }
+    log.info("Stopped OpenLineage request logger");
+  }
+
+  @Override
+  public void logNativeQuery(RequestLogLine requestLogLine) throws IOException
+  {
+    if (requestLogLine.getQuery() == null) {
+      return;
+    }
+
+    // Skip native sub-queries of a SQL execution to avoid duplicating the 
SQL-level event.
+    if (requestLogLine.getQuery().getContext().get(BaseQuery.SQL_QUERY_ID) != 
null) {
+      return;
+    }
+
+    String queryType = requestLogLine.getQuery().getType();
+
+    if (INTERNAL_QUERY_TYPES.contains(queryType)) {
+      return;
+    }
+
+    List<String> inputs = new ArrayList<>(new 
LinkedHashSet<>(requestLogLine.getQuery().getDataSource().getTableNames()));
+    String queryId = requestLogLine.getQuery().getId();
+    if (queryId == null) {
+      queryId = UUID.randomUUID().toString();
+    }
+
+    emit(buildRunEvent(queryId, null, queryType, requestLogLine, inputs, 
null));
+  }
+
+  @Override
+  public void logSqlQuery(RequestLogLine requestLogLine) throws IOException
+  {
+    String sql = requestLogLine.getSql();
+    List<String> inputs = new ArrayList<>();
+    String output = null;
+
+    if (sql != null) {
+      try {
+        SqlNode parsed = SqlParser.create(sql, SQL_PARSER_CONFIG).parseQuery();
+        inputs = extractInputs(parsed);
+        output = extractOutput(parsed);
+      }
+      catch (SqlParseException e) {
+        // Druid-specific SQL extensions (REPLACE, EXTERN, etc.) may not parse 
with the standard
+        // Calcite parser. Emit the event without table-level lineage rather 
than failing.
+        log.debug(
+            "OpenLineage: could not parse SQL for lineage extraction (query 
will still be emitted): %s",
+            e.getMessage()
+        );
+      }
+    }
+
+    String queryId = extractSqlQueryId(requestLogLine);
+    emit(buildRunEvent(queryId, sql, "sql", requestLogLine, new 
ArrayList<>(new LinkedHashSet<>(inputs)), output));
+  }
+
+  private ObjectNode buildRunEvent(
+      String queryId,
+      @Nullable String sql,
+      String queryType,
+      RequestLogLine requestLogLine,
+      List<String> inputs,
+      @Nullable String output
+  )
+  {
+    Map<String, Object> stats = requestLogLine.getQueryStats().getStats();
+    boolean success = Boolean.TRUE.equals(stats.get("success"));
+
+    ObjectNode event = jsonMapper.createObjectNode();
+    event.put("eventType", success ? "COMPLETE" : "FAIL");
+    event.put("eventTime", 
requestLogLine.getTimestamp().toInstant().toString());
+    event.put("producer", PRODUCER);
+    event.put("schemaURL", SCHEMA_URL);
+    event.set("run", buildRun(queryId, queryType, requestLogLine, stats, 
success));
+    event.set("job", buildJob(queryId, sql));
+    event.set("inputs", buildDatasets(inputs));
+    event.set("outputs", buildDatasets(output != null ? List.of(output) : 
List.of()));
+    return event;
+  }
+
+  private ObjectNode buildRun(
+      String queryId,
+      String queryType,
+      RequestLogLine requestLogLine,
+      Map<String, Object> stats,
+      boolean success
+  )
+  {
+    ObjectNode run = jsonMapper.createObjectNode();
+    run.put("runId", 
UUID.nameUUIDFromBytes(queryId.getBytes(StandardCharsets.UTF_8)).toString());
+
+    ObjectNode facets = jsonMapper.createObjectNode();
+
+    ObjectNode engineFacet = createFacet(ENGINE_FACET_SCHEMA_URL);
+    engineFacet.put("name", "druid");
+    engineFacet.put("version", getDruidVersion());
+    facets.set("processing_engine", engineFacet);
+
+    ObjectNode contextFacet = createFacet(null);
+    contextFacet.put("queryType", queryType);
+    contextFacet.put("remoteAddress", requestLogLine.getRemoteAddr());
+    Object identity = stats.get("identity");
+    if (identity != null) {
+      contextFacet.put("identity", identity.toString());
+    }
+    Object nativeQueryIds = 
requestLogLine.getSqlQueryContext().get("nativeQueryIds");
+    if (nativeQueryIds != null) {
+      contextFacet.put("nativeQueryIds", nativeQueryIds.toString());
+    }
+    facets.set("druid_query_context", contextFacet);
+
+    ObjectNode statsFacet = createFacet(null);
+    putLongStat(statsFacet, "durationMs", stats, "sqlQuery/time", 
"query/time");
+    putLongStat(statsFacet, "bytes", stats, "sqlQuery/bytes", "query/bytes");
+    putLongStat(statsFacet, "planningTimeMs", stats, 
"sqlQuery/planningTimeMs");
+    Object statusCode = stats.get("statusCode");
+    if (statusCode != null) {
+      statsFacet.put("statusCode", statusCode.toString());
+    }
+    facets.set("druid_query_statistics", statsFacet);
+
+    if (!success) {
+      Object exception = stats.get("exception");
+      if (exception != null) {
+        ObjectNode errorFacet = createFacet(ERROR_FACET_SCHEMA_URL);
+        errorFacet.put("message", exception.toString());
+        if ("sql".equals(queryType)) {
+          errorFacet.put("programmingLanguage", "SQL");
+        }
+        facets.set("errorMessage", errorFacet);
+      }
+    }
+
+    run.set("facets", facets);
+    return run;
+  }
+
+  private ObjectNode buildJob(String queryId, @Nullable String sql)
+  {
+    ObjectNode job = jsonMapper.createObjectNode();
+    job.put("namespace", namespace);
+    job.put("name", queryId);
+
+    ObjectNode facets = jsonMapper.createObjectNode();
+
+    ObjectNode jobTypeFacet = createFacet(JOB_TYPE_FACET_SCHEMA_URL);
+    jobTypeFacet.put("processingType", "BATCH");
+    jobTypeFacet.put("integration", "DRUID");
+    jobTypeFacet.put("jobType", "QUERY");
+    facets.set("jobType", jobTypeFacet);
+
+    if (sql != null) {

Review Comment:
   Does it make sense to include native query if this is not sql?



##########
docs/development/extensions-contrib/openlineage-emitter.md:
##########
@@ -0,0 +1,95 @@
+---
+id: openlineage-emitter
+title: "OpenLineage Emitter"
+---
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+To use this Apache Druid extension, 
[include](../../configuration/extensions.md#loading-extensions) 
`openlineage-emitter` in the extensions load list.
+
+## Introduction
+
+This extension emits [OpenLineage](https://openlineage.io) `RunEvent`s for 
each completed Druid query, enabling data lineage tracking with any 
OpenLineage-compatible backend such as [Marquez](https://marquezproject.ai).
+
+For SQL queries, the SQL text is parsed to extract input datasources (FROM 
clauses, JOINs, CTEs) and the output datasource (INSERT INTO). For native 
queries, table names are resolved from the datasource tree. Native sub-queries 
spawned by a SQL execution are deduplicated against the SQL-level event.
+
+:::note
+SQL table extraction relies on `calcite-core` being on the classpath, which is 
the case on Broker nodes. Native query lineage is available on all nodes.
+:::
+
+## Configuration
+
+All configuration parameters are under `druid.request.logging`.
+
+| Property | Description | Required | Default |
+|---|---|---|---|
+| `druid.request.logging.type` | Set to `openlineage` to enable this 
extension. | yes | — |
+| `druid.request.logging.namespace` | Namespace used for OpenLineage job and 
dataset URIs. Typically the Broker URL. | no | `druid://localhost` |
+| `druid.request.logging.transportType` | Where to send events. `CONSOLE` logs 
JSON to the Druid log; `HTTP` POSTs to an OpenLineage API endpoint. | no | 
`CONSOLE` |
+| `druid.request.logging.transportUrl` | OpenLineage API endpoint URL. 
Required when `transportType=HTTP`. | no | — |
+
+### Examples
+
+**Console (development)**
+
+```properties
+druid.request.logging.type=openlineage
+druid.request.logging.namespace=druid://broker.prod:8082
+```
+
+**HTTP (production)**
+
+```properties
+druid.request.logging.type=openlineage
+druid.request.logging.namespace=druid://broker.prod:8082
+druid.request.logging.transportType=HTTP
+druid.request.logging.transportUrl=http://marquez:5000/api/v1/lineage
+```
+
+**Combined with another logger using the `composing` provider**
+
+```properties
+druid.request.logging.type=composing
+druid.request.logging.loggerProviders[0].type=slf4j

Review Comment:
   Are these correct? Don't you have to set 
druid.request.logging.loggerProviders to a List with a map for each Logger?



##########
docs/development/extensions-contrib/openlineage-emitter.md:
##########
@@ -0,0 +1,95 @@
+---
+id: openlineage-emitter
+title: "OpenLineage Emitter"
+---
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+To use this Apache Druid extension, 
[include](../../configuration/extensions.md#loading-extensions) 
`openlineage-emitter` in the extensions load list.
+
+## Introduction
+
+This extension emits [OpenLineage](https://openlineage.io) `RunEvent`s for 
each completed Druid query, enabling data lineage tracking with any 
OpenLineage-compatible backend such as [Marquez](https://marquezproject.ai).
+
+For SQL queries, the SQL text is parsed to extract input datasources (FROM 
clauses, JOINs, CTEs) and the output datasource (INSERT INTO). For native 
queries, table names are resolved from the datasource tree. Native sub-queries 
spawned by a SQL execution are deduplicated against the SQL-level event.
+
+:::note
+SQL table extraction relies on `calcite-core` being on the classpath, which is 
the case on Broker nodes. Native query lineage is available on all nodes.
+:::
+
+## Configuration
+
+All configuration parameters are under `druid.request.logging`.
+
+| Property | Description | Required | Default |
+|---|---|---|---|
+| `druid.request.logging.type` | Set to `openlineage` to enable this 
extension. | yes | — |
+| `druid.request.logging.namespace` | Namespace used for OpenLineage job and 
dataset URIs. Typically the Broker URL. | no | `druid://localhost` |

Review Comment:
   Can this default to the hostname?



##########
extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLogger.java:
##########
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.extensions.openlineage;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.calcite.avatica.util.Casing;
+import org.apache.calcite.avatica.util.Quoting;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlJoin;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.SqlWithItem;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.server.RequestLogLine;
+import org.apache.druid.server.log.RequestLogger;
+import org.apache.druid.utils.CloseableUtils;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * OpenLineage RunEvents for completed Druid queries.
+ */
+public class OpenLineageRequestLogger implements RequestLogger
+{
+  private static final Logger log = new Logger(OpenLineageRequestLogger.class);
+
+  private static final String PRODUCER =
+      "https://github.com/apache/druid/extensions-contrib/openlineage-emitter";;
+  private static final String SCHEMA_URL =
+      "https://openlineage.io/spec/2-0-2/OpenLineage.json";;
+  private static final String ENGINE_FACET_SCHEMA_URL =
+      "https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json";;
+  private static final String ERROR_FACET_SCHEMA_URL =
+      "https://openlineage.io/spec/facets/1-0-0/ErrorMessageRunFacet.json";;
+  private static final String JOB_TYPE_FACET_SCHEMA_URL =
+      "https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json";;
+  private static final String SQL_FACET_SCHEMA_URL =
+      "https://openlineage.io/spec/facets/1-0-1/SQLJobFacet.json";;
+
+  // Standard Calcite parser (not Druid's custom parser): sufficient for table 
name extraction.
+  // Druid-specific syntax (REPLACE, EXTERN, etc.) falls through to the 
SqlParseException handler.
+  private static final SqlParser.Config SQL_PARSER_CONFIG = SqlParser
+      .config()
+      .withCaseSensitive(true)
+      .withUnquotedCasing(Casing.UNCHANGED)
+      .withQuotedCasing(Casing.UNCHANGED)
+      .withQuoting(Quoting.DOUBLE_QUOTE);
+
+  private static final int EMIT_QUEUE_CAPACITY = 1000;
+  private static final int EMIT_THREAD_COUNT = 1;
+
+  /**
+   * Internal broker query types that are automatically issued for schema 
discovery and metadata
+   * caching. These are not user-initiated and produce noisy, low-value 
lineage events.
+   */
+  private static final Set<String> INTERNAL_QUERY_TYPES = Set.of(
+      "segmentMetadata",
+      "dataSourceMetadata",
+      "timeBoundary"
+  );
+
+  private final ObjectMapper jsonMapper;
+  private final String namespace;
+  private final OpenLineageRequestLoggerProvider.TransportType transportType;
+  @Nullable
+  private final String transportUrl;
+  @Nullable
+  private final HttpClient httpClient;
+  @Nullable
+  private final ExecutorService emitExecutor;
+
+  public OpenLineageRequestLogger(
+      ObjectMapper jsonMapper,
+      String namespace,
+      OpenLineageRequestLoggerProvider.TransportType transportType,
+      @Nullable String transportUrl
+  )
+  {
+    this.jsonMapper = jsonMapper;
+    this.namespace = namespace;
+    this.transportType = transportType;
+    this.transportUrl = transportUrl;
+    if (transportType == OpenLineageRequestLoggerProvider.TransportType.HTTP) {
+      this.httpClient = HttpClientBuilder.create().build();

Review Comment:
   We may want to be able to override this client in case we require some 
specific http client config (keystore, truststore, etc) 



##########
extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLogger.java:
##########
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.extensions.openlineage;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.calcite.avatica.util.Casing;
+import org.apache.calcite.avatica.util.Quoting;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlJoin;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.SqlWithItem;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.server.RequestLogLine;
+import org.apache.druid.server.log.RequestLogger;
+import org.apache.druid.utils.CloseableUtils;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * OpenLineage RunEvents for completed Druid queries.
+ */
+public class OpenLineageRequestLogger implements RequestLogger
+{
+  private static final Logger log = new Logger(OpenLineageRequestLogger.class);
+
+  private static final String PRODUCER =
+      "https://github.com/apache/druid/extensions-contrib/openlineage-emitter";;
+  private static final String SCHEMA_URL =
+      "https://openlineage.io/spec/2-0-2/OpenLineage.json";;
+  private static final String ENGINE_FACET_SCHEMA_URL =
+      "https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json";;
+  private static final String ERROR_FACET_SCHEMA_URL =
+      "https://openlineage.io/spec/facets/1-0-0/ErrorMessageRunFacet.json";;
+  private static final String JOB_TYPE_FACET_SCHEMA_URL =
+      "https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json";;
+  private static final String SQL_FACET_SCHEMA_URL =
+      "https://openlineage.io/spec/facets/1-0-1/SQLJobFacet.json";;
+
+  // Standard Calcite parser (not Druid's custom parser): sufficient for table 
name extraction.
+  // Druid-specific syntax (REPLACE, EXTERN, etc.) falls through to the 
SqlParseException handler.
+  private static final SqlParser.Config SQL_PARSER_CONFIG = SqlParser
+      .config()
+      .withCaseSensitive(true)
+      .withUnquotedCasing(Casing.UNCHANGED)
+      .withQuotedCasing(Casing.UNCHANGED)
+      .withQuoting(Quoting.DOUBLE_QUOTE);
+
+  private static final int EMIT_QUEUE_CAPACITY = 1000;
+  private static final int EMIT_THREAD_COUNT = 1;
+
+  /**
+   * Internal broker query types that are automatically issued for schema 
discovery and metadata
+   * caching. These are not user-initiated and produce noisy, low-value 
lineage events.
+   */
+  private static final Set<String> INTERNAL_QUERY_TYPES = Set.of(
+      "segmentMetadata",
+      "dataSourceMetadata",
+      "timeBoundary"
+  );
+
+  private final ObjectMapper jsonMapper;
+  private final String namespace;
+  private final OpenLineageRequestLoggerProvider.TransportType transportType;
+  @Nullable
+  private final String transportUrl;
+  @Nullable
+  private final HttpClient httpClient;
+  @Nullable
+  private final ExecutorService emitExecutor;
+
+  public OpenLineageRequestLogger(
+      ObjectMapper jsonMapper,
+      String namespace,
+      OpenLineageRequestLoggerProvider.TransportType transportType,
+      @Nullable String transportUrl
+  )
+  {
+    this.jsonMapper = jsonMapper;
+    this.namespace = namespace;
+    this.transportType = transportType;
+    this.transportUrl = transportUrl;
+    if (transportType == OpenLineageRequestLoggerProvider.TransportType.HTTP) {
+      this.httpClient = HttpClientBuilder.create().build();
+      // Bounded queue: if the queue is full, silently drop the event rather 
than blocking
+      // the query thread. Uses Druid's Execs for daemon thread naming 
conventions.
+      this.emitExecutor = new ThreadPoolExecutor(
+          EMIT_THREAD_COUNT,
+          EMIT_THREAD_COUNT,
+          60L,
+          TimeUnit.SECONDS,
+          new ArrayBlockingQueue<>(EMIT_QUEUE_CAPACITY),
+          Execs.makeThreadFactory("OpenLineageEmitter-%d"),
+          new ThreadPoolExecutor.DiscardPolicy()
+      );
+    } else {
+      this.httpClient = null;
+      this.emitExecutor = null;
+    }
+  }
+
+  @LifecycleStart
+  @Override
+  public void start() throws Exception
+  {
+    if (transportType == OpenLineageRequestLoggerProvider.TransportType.HTTP 
&& transportUrl == null) {
+      throw new IllegalStateException(
+          "druid.request.logging.transportUrl must be set when 
transportType=HTTP"
+      );
+    }
+    if (transportType == OpenLineageRequestLoggerProvider.TransportType.HTTP) {
+      log.info("Started OpenLineage HTTP transport to [%s]", transportUrl);
+    } else {
+      log.info("Started OpenLineage console transport");
+    }
+  }
+
+  @LifecycleStop
+  @Override
+  public void stop()
+  {
+    if (emitExecutor != null) {
+      emitExecutor.shutdown();
+      try {
+        if (!emitExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          emitExecutor.shutdownNow();
+        }
+      }
+      catch (InterruptedException e) {
+        emitExecutor.shutdownNow();
+        Thread.currentThread().interrupt();
+      }
+    }
+    if (httpClient instanceof Closeable) {
+      CloseableUtils.closeAndSuppressExceptions(
+          (Closeable) httpClient,
+          e -> log.warn(e, "Failed to close OpenLineage HTTP client")
+      );
+    }
+    log.info("Stopped OpenLineage request logger");
+  }
+
+  @Override
+  public void logNativeQuery(RequestLogLine requestLogLine) throws IOException
+  {
+    if (requestLogLine.getQuery() == null) {
+      return;
+    }
+
+    // Skip native sub-queries of a SQL execution to avoid duplicating the 
SQL-level event.
+    if (requestLogLine.getQuery().getContext().get(BaseQuery.SQL_QUERY_ID) != 
null) {
+      return;
+    }
+
+    String queryType = requestLogLine.getQuery().getType();
+
+    if (INTERNAL_QUERY_TYPES.contains(queryType)) {
+      return;
+    }
+
+    List<String> inputs = new ArrayList<>(new 
LinkedHashSet<>(requestLogLine.getQuery().getDataSource().getTableNames()));
+    String queryId = requestLogLine.getQuery().getId();
+    if (queryId == null) {
+      queryId = UUID.randomUUID().toString();
+    }
+
+    emit(buildRunEvent(queryId, null, queryType, requestLogLine, inputs, 
null));
+  }
+
+  @Override
+  public void logSqlQuery(RequestLogLine requestLogLine) throws IOException
+  {
+    String sql = requestLogLine.getSql();
+    List<String> inputs = new ArrayList<>();
+    String output = null;
+
+    if (sql != null) {
+      try {
+        SqlNode parsed = SqlParser.create(sql, SQL_PARSER_CONFIG).parseQuery();
+        inputs = extractInputs(parsed);
+        output = extractOutput(parsed);
+      }
+      catch (SqlParseException e) {
+        // Druid-specific SQL extensions (REPLACE, EXTERN, etc.) may not parse 
with the standard
+        // Calcite parser. Emit the event without table-level lineage rather 
than failing.
+        log.debug(
+            "OpenLineage: could not parse SQL for lineage extraction (query 
will still be emitted): %s",
+            e.getMessage()
+        );
+      }
+    }
+
+    String queryId = extractSqlQueryId(requestLogLine);
+    emit(buildRunEvent(queryId, sql, "sql", requestLogLine, new 
ArrayList<>(new LinkedHashSet<>(inputs)), output));
+  }
+
+  private ObjectNode buildRunEvent(
+      String queryId,
+      @Nullable String sql,
+      String queryType,
+      RequestLogLine requestLogLine,
+      List<String> inputs,
+      @Nullable String output
+  )
+  {
+    Map<String, Object> stats = requestLogLine.getQueryStats().getStats();
+    boolean success = Boolean.TRUE.equals(stats.get("success"));
+
+    ObjectNode event = jsonMapper.createObjectNode();
+    event.put("eventType", success ? "COMPLETE" : "FAIL");
+    event.put("eventTime", 
requestLogLine.getTimestamp().toInstant().toString());
+    event.put("producer", PRODUCER);
+    event.put("schemaURL", SCHEMA_URL);
+    event.set("run", buildRun(queryId, queryType, requestLogLine, stats, 
success));
+    event.set("job", buildJob(queryId, sql));
+    event.set("inputs", buildDatasets(inputs));
+    event.set("outputs", buildDatasets(output != null ? List.of(output) : 
List.of()));
+    return event;
+  }
+
+  private ObjectNode buildRun(
+      String queryId,
+      String queryType,
+      RequestLogLine requestLogLine,
+      Map<String, Object> stats,
+      boolean success
+  )
+  {
+    ObjectNode run = jsonMapper.createObjectNode();
+    run.put("runId", 
UUID.nameUUIDFromBytes(queryId.getBytes(StandardCharsets.UTF_8)).toString());
+
+    ObjectNode facets = jsonMapper.createObjectNode();
+
+    ObjectNode engineFacet = createFacet(ENGINE_FACET_SCHEMA_URL);
+    engineFacet.put("name", "druid");
+    engineFacet.put("version", getDruidVersion());
+    facets.set("processing_engine", engineFacet);
+
+    ObjectNode contextFacet = createFacet(null);
+    contextFacet.put("queryType", queryType);
+    contextFacet.put("remoteAddress", requestLogLine.getRemoteAddr());
+    Object identity = stats.get("identity");
+    if (identity != null) {
+      contextFacet.put("identity", identity.toString());
+    }
+    Object nativeQueryIds = 
requestLogLine.getSqlQueryContext().get("nativeQueryIds");
+    if (nativeQueryIds != null) {
+      contextFacet.put("nativeQueryIds", nativeQueryIds.toString());
+    }
+    facets.set("druid_query_context", contextFacet);
+
+    ObjectNode statsFacet = createFacet(null);
+    putLongStat(statsFacet, "durationMs", stats, "sqlQuery/time", 
"query/time");
+    putLongStat(statsFacet, "bytes", stats, "sqlQuery/bytes", "query/bytes");
+    putLongStat(statsFacet, "planningTimeMs", stats, 
"sqlQuery/planningTimeMs");
+    Object statusCode = stats.get("statusCode");
+    if (statusCode != null) {
+      statsFacet.put("statusCode", statusCode.toString());
+    }
+    facets.set("druid_query_statistics", statsFacet);
+
+    if (!success) {
+      Object exception = stats.get("exception");
+      if (exception != null) {
+        ObjectNode errorFacet = createFacet(ERROR_FACET_SCHEMA_URL);
+        errorFacet.put("message", exception.toString());
+        if ("sql".equals(queryType)) {
+          errorFacet.put("programmingLanguage", "SQL");
+        }
+        facets.set("errorMessage", errorFacet);
+      }
+    }
+
+    run.set("facets", facets);
+    return run;
+  }
+
+  private ObjectNode buildJob(String queryId, @Nullable String sql)
+  {
+    ObjectNode job = jsonMapper.createObjectNode();
+    job.put("namespace", namespace);
+    job.put("name", queryId);
+
+    ObjectNode facets = jsonMapper.createObjectNode();
+
+    ObjectNode jobTypeFacet = createFacet(JOB_TYPE_FACET_SCHEMA_URL);
+    jobTypeFacet.put("processingType", "BATCH");
+    jobTypeFacet.put("integration", "DRUID");
+    jobTypeFacet.put("jobType", "QUERY");
+    facets.set("jobType", jobTypeFacet);
+
+    if (sql != null) {
+      ObjectNode sqlFacet = createFacet(SQL_FACET_SCHEMA_URL);
+      sqlFacet.put("query", sql);
+      facets.set("sql", sqlFacet);
+    }
+
+    job.set("facets", facets);
+    return job;
+  }
+
+  private ObjectNode createFacet(@Nullable String schemaUrl)
+  {
+    ObjectNode facet = jsonMapper.createObjectNode();
+    facet.put("_producer", PRODUCER);
+    if (schemaUrl != null) {
+      facet.put("_schemaURL", schemaUrl);
+    }
+    return facet;
+  }
+
+  private ArrayNode buildDatasets(List<String> tableNames)
+  {
+    ArrayNode array = jsonMapper.createArrayNode();
+    for (String name : tableNames) {
+      ObjectNode node = jsonMapper.createObjectNode();
+      node.put("namespace", namespace);
+      node.put("name", name);
+      node.set("facets", jsonMapper.createObjectNode());

Review Comment:
   Can we create a Github issue to track work to include the fields?



##########
extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLogger.java:
##########
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.extensions.openlineage;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.calcite.avatica.util.Casing;
+import org.apache.calcite.avatica.util.Quoting;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlJoin;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.SqlWithItem;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.server.RequestLogLine;
+import org.apache.druid.server.log.RequestLogger;
+import org.apache.druid.utils.CloseableUtils;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * OpenLineage RunEvents for completed Druid queries.
+ */
+public class OpenLineageRequestLogger implements RequestLogger
+{
+  private static final Logger log = new Logger(OpenLineageRequestLogger.class);
+
+  private static final String PRODUCER =
+      "https://github.com/apache/druid/extensions-contrib/openlineage-emitter";;
+  private static final String SCHEMA_URL =
+      "https://openlineage.io/spec/2-0-2/OpenLineage.json";;
+  private static final String ENGINE_FACET_SCHEMA_URL =
+      "https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json";;
+  private static final String ERROR_FACET_SCHEMA_URL =
+      "https://openlineage.io/spec/facets/1-0-0/ErrorMessageRunFacet.json";;
+  private static final String JOB_TYPE_FACET_SCHEMA_URL =
+      "https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json";;
+  private static final String SQL_FACET_SCHEMA_URL =
+      "https://openlineage.io/spec/facets/1-0-1/SQLJobFacet.json";;
+
+  // Standard Calcite parser (not Druid's custom parser): sufficient for table 
name extraction.
+  // Druid-specific syntax (REPLACE, EXTERN, etc.) falls through to the 
SqlParseException handler.
+  private static final SqlParser.Config SQL_PARSER_CONFIG = SqlParser
+      .config()
+      .withCaseSensitive(true)
+      .withUnquotedCasing(Casing.UNCHANGED)
+      .withQuotedCasing(Casing.UNCHANGED)
+      .withQuoting(Quoting.DOUBLE_QUOTE);
+
+  private static final int EMIT_QUEUE_CAPACITY = 1000;
+  private static final int EMIT_THREAD_COUNT = 1;
+
+  /**
+   * Internal broker query types that are automatically issued for schema 
discovery and metadata
+   * caching. These are not user-initiated and produce noisy, low-value 
lineage events.
+   */
+  private static final Set<String> INTERNAL_QUERY_TYPES = Set.of(

Review Comment:
   Maybe make this configurable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to