This is an automated email from the ASF dual-hosted git repository.

jtuglu1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e991fd1bdc feat: add OpenLineage request logger extension (#19107)
0e991fd1bdc is described below

commit 0e991fd1bdc10fb82946c2ef9a6e0c2113085eae
Author: mshahid6 <[email protected]>
AuthorDate: Tue Jun 16 16:07:35 2026 -0700

    feat: add OpenLineage request logger extension (#19107)
    
    Added extensions-contrib/openlineage-emitter as a contrib extension that 
uses the RequestLogger to transform and send lineage information to any 
OpenLineage-compatible API.
    
    For SQL queries, the SQL text is parsed with the Calcite parser to extract 
input datasources (FROM clauses, JOINs, CTEs) and output datasources (INSERT 
INTO). For native queries, table names are read from 
DataSource.getTableNames(). Native sub-queries spawned by a SQL execution are 
deduplicated against the SQL-level event.
    
    Each event includes standard OpenLineage facets (processing_engine, 
jobType, sql,errorMessage) and custom Druid facets (druid_query_context with 
user identity and query metadata, druid_query_statistics with duration and 
bytes).
---
 distribution/pom.xml                               |   2 +
 docs/configuration/extensions.md                   |   1 +
 .../extensions-contrib/openlineage-emitter.md      |  98 ++++
 extensions-contrib/openlineage-emitter/pom.xml     | 143 +++++
 .../openlineage/OpenLineageRequestLogger.java      | 575 ++++++++++++++++++++
 .../OpenLineageRequestLoggerModule.java            |  45 ++
 .../OpenLineageRequestLoggerProvider.java          | 171 ++++++
 .../org.apache.druid.initialization.DruidModule    |  16 +
 .../DruidQueryContextRunFacet.json                 |  29 +
 .../DruidQueryStatisticsRunFacet.json              |  28 +
 .../openlineage/OpenLineageRequestLoggerTest.java  | 597 +++++++++++++++++++++
 pom.xml                                            |   1 +
 website/.spelling                                  |   7 +
 13 files changed, 1713 insertions(+)

diff --git a/distribution/pom.xml b/distribution/pom.xml
index 1eb04f10010..38ac0e6e3c0 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -404,6 +404,8 @@
                                         <argument>-c</argument>
                                         
<argument>org.apache.druid.extensions.contrib:kafka-emitter</argument>
                                         <argument>-c</argument>
+                                        
<argument>org.apache.druid.extensions.contrib:openlineage-emitter</argument>
+                                        <argument>-c</argument>
                                         
<argument>org.apache.druid.extensions.contrib:druid-opentsdb-emitter</argument>
                                         <argument>-c</argument>
                                         
<argument>org.apache.druid.extensions.contrib:druid-redis-cache</argument>
diff --git a/docs/configuration/extensions.md b/docs/configuration/extensions.md
index 31f1a5b62b2..2ad0ad180b2 100644
--- a/docs/configuration/extensions.md
+++ b/docs/configuration/extensions.md
@@ -93,6 +93,7 @@ All of these community extensions can be downloaded using 
[pull-deps](../operati
 |graphite-emitter|Graphite metrics 
emitter|[link](../development/extensions-contrib/graphite.md)|
 |statsd-emitter|StatsD metrics 
emitter|[link](../development/extensions-contrib/statsd.md)|
 |kafka-emitter|Kafka metrics 
emitter|[link](../development/extensions-contrib/kafka-emitter.md)|
+|openlineage-emitter|OpenLineage data lineage 
emitter|[link](../development/extensions-contrib/openlineage-emitter.md)|
 |druid-thrift-extensions|Support thrift ingestion 
|[link](../development/extensions-contrib/thrift.md)|
 |druid-opentsdb-emitter|OpenTSDB metrics emitter 
|[link](../development/extensions-contrib/opentsdb-emitter.md)|
 |druid-moving-average-query|Support for [Moving 
Average](https://en.wikipedia.org/wiki/Moving_average) and other Aggregate 
[Window 
Functions](https://en.wikibooks.org/wiki/Structured_Query_Language/Window_functions)
 in Druid 
queries.|[link](../development/extensions-contrib/moving-average-query.md)|
diff --git a/docs/development/extensions-contrib/openlineage-emitter.md 
b/docs/development/extensions-contrib/openlineage-emitter.md
new file mode 100644
index 00000000000..6e613cf6320
--- /dev/null
+++ b/docs/development/extensions-contrib/openlineage-emitter.md
@@ -0,0 +1,98 @@
+---
+id: openlineage-emitter
+title: "OpenLineage Emitter"
+---
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+To use this Apache Druid extension, 
[include](../../configuration/extensions.md#loading-extensions) 
`openlineage-emitter` in the extensions load list.
+
+## Introduction
+
+This extension emits [OpenLineage](https://openlineage.io) `RunEvent`s for 
each completed Druid query, enabling data lineage tracking with any 
OpenLineage-compatible backend such as [Marquez](https://marquezproject.ai).
+
+For MSQ DML statements (`INSERT INTO` / `REPLACE INTO`), the output datasource 
is extracted from the SQL text and emitted as an output dataset. Input 
extraction from SQL is not performed — reliably resolving `FROM` / `JOIN` 
tables at the logger layer would duplicate planner work. For native queries, 
input table names are resolved from the datasource tree and emitted as input 
datasets. Native sub-queries spawned by a SQL execution carry a `sqlQueryId` in 
their context facet for correlatio [...]
+
+:::note
+MSQ DML output extraction relies on `calcite-core` being on the classpath, 
which is the case on Broker nodes. Native query lineage is available on all 
nodes.
+:::
+
+## Configuration
+
+All configuration parameters are under `druid.request.logging`.
+
+| Property | Description | Required | Default |
+|---|---|---|---|
+| `druid.request.logging.type` | Set to `openlineage` to enable this 
extension. | yes | — |
+| `druid.request.logging.namespace` | Namespace used for OpenLineage job and 
dataset URIs. Typically the Broker URL. | no | `druid://<hostname>` |
+| `druid.request.logging.transportType` | Where to send events. `CONSOLE` logs 
JSON to the Druid log; `HTTP` POSTs to an OpenLineage API endpoint. | no | 
`CONSOLE` |
+| `druid.request.logging.transportUrl` | OpenLineage API endpoint URL. 
Required when `transportType=HTTP`. | no | — |
+| `druid.request.logging.excludedNativeQueryTypes` | Native query types to 
exclude from lineage emission. Internal broker queries like segment metadata 
lookups produce noisy, low-value events. | no | `["segmentMetadata", 
"dataSourceMetadata", "timeBoundary"]` |
+| `druid.request.logging.emitQueueCapacity` | Maximum number of events 
buffered in the async HTTP emit queue. Events are dropped (with a warning) when 
the queue is full. Only applies when `transportType=HTTP`. | no | `1000` |
+| `druid.request.logging.emitThreadCount` | Number of background threads used 
to POST events to the HTTP endpoint. Only applies when `transportType=HTTP`. | 
no | `1` |
+| `druid.request.logging.trustStorePath` | Path to the TrustStore file for 
HTTPS transport. Only applies when `transportType=HTTP`. | no | — |
+| `druid.request.logging.trustStorePassword` | Password for the TrustStore. 
Accepts a plain string or a 
[PasswordProvider](../../operations/password-provider.md) (e.g. an environment 
variable). Only applies when `transportType=HTTP`. | no | — |
+| `druid.request.logging.keyStorePath` | Path to the KeyStore file for mutual 
TLS. Only applies when `transportType=HTTP`. | no | — |
+| `druid.request.logging.keyStorePassword` | Password for the KeyStore. 
Accepts a plain string or a 
[PasswordProvider](../../operations/password-provider.md). Only applies when 
`transportType=HTTP`. | no | — |
+
+### Examples
+
+**Console (development)**
+
+```properties
+druid.request.logging.type=openlineage
+druid.request.logging.namespace=druid://broker.prod:8082
+```
+
+**HTTP (production)**
+
+```properties
+druid.request.logging.type=openlineage
+druid.request.logging.namespace=druid://broker.prod:8082
+druid.request.logging.transportType=HTTP
+druid.request.logging.transportUrl=http://marquez:5000/api/v1/lineage
+```
+
+**Combined with another logger using the `composing` provider**
+
+```properties
+druid.request.logging.type=composing
+druid.request.logging.loggerProviders=[{"type":"slf4j"},{"type":"openlineage","namespace":"druid://broker.prod:8082","transportType":"HTTP","transportUrl":"http://marquez:5000/api/v1/lineage"}]
+```
+
+## Event structure
+
+Each emitted event follows the [OpenLineage 
spec](https://openlineage.io/spec/2-0-2/OpenLineage.json) and includes the 
following facets.
+
+### Run facets
+
+| Facet | Description |
+|---|---|
+| `processing_engine` | Engine name (`druid`). Standard OpenLineage facet. |
+| `druid_query_context` | Query metadata: `identity` (authenticated user), 
`remoteAddress`, `queryType`, and `sqlQueryId` (on native sub-queries of SQL, 
for correlation with the parent SQL event). |
+| `druid_query_statistics` | Execution stats: `durationMs`, `bytes`, 
`planningTimeMs`, `statusCode`. |
+| `errorMessage` | Exception message for failed queries. Standard OpenLineage 
facet. |
+
+### Job facets
+
+| Facet | Description |
+|---|---|
+| `jobType` | `processingType=BATCH`, `integration=DRUID`, `jobType=QUERY`. 
Standard OpenLineage facet. |
+| `sql` | Raw SQL text. Present on SQL queries only. Standard OpenLineage 
facet. |
diff --git a/extensions-contrib/openlineage-emitter/pom.xml 
b/extensions-contrib/openlineage-emitter/pom.xml
new file mode 100644
index 00000000000..40129e81233
--- /dev/null
+++ b/extensions-contrib/openlineage-emitter/pom.xml
@@ -0,0 +1,143 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.druid</groupId>
+    <artifactId>druid</artifactId>
+    <version>38.0.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.druid.extensions.contrib</groupId>
+  <artifactId>openlineage-emitter</artifactId>
+  <name>openlineage-emitter</name>
+  <description>
+    OpenLineage data lineage emitter for Apache Druid. Captures table-level 
lineage from
+    native and SQL queries and emits OpenLineage RunEvents to a console or 
HTTP endpoint.
+
+    Configure via druid.request.logging.type=openlineage.
+    Input datasources are resolved from the native query plan; the SQL parser 
(calcite-core,
+    present on Broker nodes) is used only to extract the output datasource of 
MSQ INSERT/REPLACE.
+  </description>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.druid</groupId>
+      <artifactId>druid-processing</artifactId>
+      <version>${project.parent.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.druid</groupId>
+      <artifactId>druid-server</artifactId>
+      <version>${project.parent.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.druid</groupId>
+      <artifactId>druid-sql</artifactId>
+      <version>${project.parent.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.calcite</groupId>
+      <artifactId>calcite-core</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.calcite.avatica</groupId>
+      <artifactId>avatica-core</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.inject</groupId>
+      <artifactId>guice</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpcore</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>jakarta.validation</groupId>
+      <artifactId>jakarta.validation-api</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-engine</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.owasp</groupId>
+        <artifactId>dependency-check-maven</artifactId>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git 
a/extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLogger.java
 
b/extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLogger.java
new file mode 100644
index 00000000000..1a3739b1c04
--- /dev/null
+++ 
b/extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLogger.java
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.extensions.openlineage;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.server.RequestLogLine;
+import org.apache.druid.server.log.RequestLogger;
+import org.apache.druid.sql.calcite.parser.DruidSqlParser;
+import org.apache.druid.sql.calcite.parser.ExternalDestinationSqlIdentifier;
+import org.apache.druid.utils.CloseableUtils;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * OpenLineage RunEvents for completed Druid queries.
+ */
+public class OpenLineageRequestLogger implements RequestLogger
+{
+  private static final Logger log = new Logger(OpenLineageRequestLogger.class);
+
+  private static final String PRODUCER =
+      
"https://github.com/apache/druid/tree/master/extensions-contrib/openlineage-emitter";;
+  private static final String SCHEMA_URL =
+      "https://openlineage.io/spec/2-0-2/OpenLineage.json";;
+  private static final String ENGINE_FACET_SCHEMA_URL =
+      "https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json";;
+  private static final String ERROR_FACET_SCHEMA_URL =
+      "https://openlineage.io/spec/facets/1-0-0/ErrorMessageRunFacet.json";;
+  private static final String JOB_TYPE_FACET_SCHEMA_URL =
+      "https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json";;
+  private static final String SQL_FACET_SCHEMA_URL =
+      "https://openlineage.io/spec/facets/1-0-1/SQLJobFacet.json";;
+  // raw.githubusercontent.com so consumers can dereference the URL and 
receive JSON, not an HTML tree page.
+  private static final String CUSTOM_SCHEMA_BASE =
+      
"https://raw.githubusercontent.com/apache/druid/master/extensions-contrib/openlineage-emitter";
+      + "/src/main/resources/openlineage-schema/";
+  private static final String CONTEXT_FACET_SCHEMA_URL = CUSTOM_SCHEMA_BASE + 
"DruidQueryContextRunFacet.json";
+  private static final String STATS_FACET_SCHEMA_URL = CUSTOM_SCHEMA_BASE + 
"DruidQueryStatisticsRunFacet.json";
+  static final int SQL_FACET_MAX_LENGTH = 64 * 1024;
+  static final int DEFAULT_EMIT_QUEUE_CAPACITY = 1000;
+  static final int DEFAULT_EMIT_THREAD_COUNT = 1;
+  private static final int DISCARD_WARNING_INTERVAL = 1000;
+  /**
+   * Number of attempts per event (1 initial + MAX_SEND_RETRIES retries).
+   * Delivery is at-most-once after all attempts are exhausted: if every 
attempt
+   * fails the event is dropped and a warning is logged.
+   */
+  static final int MAX_SEND_RETRIES = 2;
+  private static final long RETRY_SLEEP_MS = 500;
+
+  static final String UNKNOWN_QUERY_ID = "unknown-query-id";
+
+  private final ObjectMapper jsonMapper;
+  private final String namespace;
+  private final OpenLineageRequestLoggerProvider.TransportType transportType;
+  @Nullable
+  private final String transportUrl;
+  private final Set<String> excludedNativeQueryTypes;
+  @Nullable
+  private final HttpClient httpClient;
+  @Nullable
+  private final ExecutorService emitExecutor;
+  private final AtomicLong discardedEventCount = new AtomicLong(0);
+
+  public OpenLineageRequestLogger(
+      ObjectMapper jsonMapper,
+      String namespace,
+      OpenLineageRequestLoggerProvider.TransportType transportType,
+      @Nullable String transportUrl,
+      Set<String> excludedNativeQueryTypes
+  )
+  {
+    this(jsonMapper, namespace, transportType, transportUrl, 
excludedNativeQueryTypes,
+         DEFAULT_EMIT_QUEUE_CAPACITY, DEFAULT_EMIT_THREAD_COUNT, null);
+  }
+
+  public OpenLineageRequestLogger(
+      ObjectMapper jsonMapper,
+      String namespace,
+      OpenLineageRequestLoggerProvider.TransportType transportType,
+      @Nullable String transportUrl,
+      Set<String> excludedNativeQueryTypes,
+      int emitQueueCapacity,
+      int emitThreadCount,
+      @Nullable HttpClient httpClient
+  )
+  {
+    this.jsonMapper = jsonMapper;
+    this.namespace = namespace;
+    this.transportType = transportType;
+    this.transportUrl = transportUrl;
+    this.excludedNativeQueryTypes = excludedNativeQueryTypes;
+    if (transportType == OpenLineageRequestLoggerProvider.TransportType.HTTP 
&& transportUrl == null) {
+      throw new IllegalStateException(
+          "druid.request.logging.transportUrl must be set when 
transportType=HTTP"
+      );
+    }
+    if (transportType == OpenLineageRequestLoggerProvider.TransportType.HTTP) {
+      this.httpClient = httpClient != null ? httpClient : 
HttpClientBuilder.create().build();
+      // Bounded queue: if the queue is full, drop the event rather than 
blocking the query thread.
+      // A warning is logged on the first drop and every 
DISCARD_WARNING_INTERVAL drops thereafter.
+      this.emitExecutor = new ThreadPoolExecutor(
+          emitThreadCount,
+          emitThreadCount,
+          60L,
+          TimeUnit.SECONDS,
+          new ArrayBlockingQueue<>(emitQueueCapacity),
+          Execs.makeThreadFactory("OpenLineageEmitter-%d"),
+          new DiscardWithWarningPolicy(discardedEventCount)
+      );
+    } else {
+      this.httpClient = null;
+      this.emitExecutor = null;
+    }
+  }
+
+  // Note: ComposingRequestLogger does not delegate @LifecycleStart to 
sub-loggers, so this method
+  // may not be called when used in a composing configuration. HTTP URL 
validation is therefore
+  // performed in the constructor instead. This method is retained for direct 
lifecycle use.
+  @LifecycleStart
+  @Override
+  public void start()
+  {
+    log.info(
+        "Started OpenLineage %s transport%s",
+        transportType,
+        transportUrl != null ? " to [" + transportUrl + "]" : ""
+    );
+  }
+
+  @LifecycleStop
+  @Override
+  public void stop()
+  {
+    if (emitExecutor != null) {
+      emitExecutor.shutdown();
+      try {
+        if (!emitExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          emitExecutor.shutdownNow();
+        }
+      }
+      catch (InterruptedException e) {
+        emitExecutor.shutdownNow();
+        Thread.currentThread().interrupt();
+      }
+    }
+    if (httpClient instanceof Closeable) {
+      CloseableUtils.closeAndSuppressExceptions(
+          (Closeable) httpClient,
+          e -> log.warn(e, "Failed to close OpenLineage HTTP client")
+      );
+    }
+    log.info("Stopped OpenLineage request logger");
+  }
+
+  @Override
+  public void logNativeQuery(RequestLogLine requestLogLine) throws IOException
+  {
+    if (requestLogLine.getQuery() == null) {
+      return;
+    }
+
+    String queryType = requestLogLine.getQuery().getType();
+
+    if (excludedNativeQueryTypes.contains(queryType)) {
+      return;
+    }
+
+    List<String> inputs = new ArrayList<>(new 
LinkedHashSet<>(requestLogLine.getQuery().getDataSource().getTableNames()));
+    String queryId = requestLogLine.getQuery().getId();
+    if (queryId == null) {
+      log.debug("Native query reached OpenLineage logger without a query ID");
+      queryId = UNKNOWN_QUERY_ID;
+    }
+
+    emit(buildRunEvent(queryId, queryType, requestLogLine, inputs, null));
+  }
+
+  /**
+   * Emits lineage for MSQ DML statements (INSERT INTO / REPLACE INTO). For 
native-engine
+   * SQL SELECT queries, lineage is emitted from {@link #logNativeQuery} 
instead, which has
+   * structured access to datasource references without requiring SQL parsing.
+   *
+   * <p>MSQ INSERT/REPLACE queries submit an MSQControllerTask and never 
produce a native
+   * request-log event, so their output lineage must be captured here. The 
output datasource
+   * is extracted from the SQL AST via Druid's SQL parser; inputs are not 
emitted because
+   * reliably extracting FROM/JOIN tables in the logger layer would duplicate 
planner work.
+   */
+  @Override
+  public void logSqlQuery(RequestLogLine requestLogLine) throws IOException
+  {
+    String sql = requestLogLine.getSql();
+    if (sql == null) {
+      return;
+    }
+    String outputTable = extractMsqOutputDatasource(sql);
+    if (outputTable == null) {
+      return;
+    }
+
+    Map<String, Object> sqlContext = requestLogLine.getSqlQueryContext();
+    String queryId = sqlContext != null ? (String) 
sqlContext.get("sqlQueryId") : null;
+    if (queryId == null) {
+      log.debug("MSQ SQL query reached OpenLineage logger without a 
sqlQueryId");
+      queryId = UNKNOWN_QUERY_ID;
+    }
+
+    emit(buildRunEvent(queryId, "msq", requestLogLine, List.of(), 
outputTable));
+  }
+
+  /**
+   * Extracts the output datasource name from an MSQ DML statement (INSERT 
INTO / REPLACE INTO)
+   * using Druid's SQL parser. Returns {@code null} for any input that is not 
a Druid
+   * INSERT/REPLACE into a regular datasource — including:
+   * <ul>
+   *   <li>SELECT and other non-DML statements</li>
+   *   <li>{@code INSERT INTO EXTERN(...) AS CSV ...} export statements 
(target parses as a
+   *       SqlCall, not a SqlIdentifier)</li>
+   *   <li>SQL that fails to parse</li>
+   * </ul>
+   *
+   * <p>For {@code INSERT INTO druid.foo} or {@code INSERT INTO 
catalog.druid.foo}, returns just
+   * {@code foo} — matching the planner's normalization to a bare datasource 
name. CTEs in the
+   * Druid MSQ form {@code INSERT INTO foo WITH x AS (...) SELECT ...} are 
handled correctly
+   * because the target table is on the outer {@code SqlInsert} node; the CTE 
appears as the
+   * source, not as a wrapper around the statement.
+   */
+  @Nullable
+  static String extractMsqOutputDatasource(String sql)
+  {
+    try {
+      // allowSetStatements=true so that a Druid SET preamble (e.g. "SET 
sqlQueryId = '...'; INSERT
+      // INTO foo ...") doesn't cause the parse to throw. We only inspect the 
main statement.
+      SqlNode node = DruidSqlParser.parse(sql, true).getMainStatement();
+      // WITH ... INSERT/REPLACE wraps the ingest with a SqlWith; unwrap it.
+      if (node instanceof SqlWith) {
+        node = ((SqlWith) node).body;
+      }
+      // DruidSqlInsert and DruidSqlReplace both extend SqlInsert, so this 
covers both.
+      if (!(node instanceof SqlInsert)) {
+        return null;
+      }
+      SqlNode target = ((SqlInsert) node).getTargetTable();
+      // INSERT INTO EXTERN(...) AS CSV writes to a file, not a Druid 
datasource.
+      // ExternalDestinationSqlIdentifier extends SqlIdentifier, so check it 
explicitly.
+      if (target instanceof ExternalDestinationSqlIdentifier) {
+        return null;
+      }
+      if (!(target instanceof SqlIdentifier)) {
+        return null;
+      }
+      SqlIdentifier id = (SqlIdentifier) target;
+      // Druid's planner normalizes any schema/catalog prefix (e.g. 
"druid.foo",
+      // "catalog.druid.foo") to the bare datasource name. Match that here.
+      return id.names.isEmpty() ? null : id.names.get(id.names.size() - 1);
+    }
+    catch (Exception e) {
+      log.debug(e, "Failed to parse SQL for MSQ output datasource extraction; 
skipping output lineage");
+      return null;
+    }
+  }
+
+  private ObjectNode buildRunEvent(
+      String queryId,
+      String queryType,
+      RequestLogLine requestLogLine,
+      List<String> inputs,
+      @Nullable String output
+  )
+  {
+    Map<String, Object> stats = requestLogLine.getQueryStats().getStats();
+    boolean success = Boolean.TRUE.equals(stats.get("success"));
+
+    ObjectNode event = jsonMapper.createObjectNode();
+    event.put("eventType", success ? "COMPLETE" : "FAIL");
+    event.put("eventTime", 
requestLogLine.getTimestamp().toInstant().toString());
+    event.put("producer", PRODUCER);
+    event.put("schemaURL", SCHEMA_URL);
+    event.set("run", buildRun(queryId, queryType, requestLogLine, stats, 
success));
+    event.set("job", buildJob(queryId, requestLogLine.getSql()));
+    event.set("inputs", buildDatasets(inputs));
+    event.set("outputs", buildDatasets(output != null ? List.of(output) : 
List.of()));
+    return event;
+  }
+
+  private ObjectNode buildRun(
+      String queryId,
+      String queryType,
+      RequestLogLine requestLogLine,
+      Map<String, Object> stats,
+      boolean success
+  )
+  {
+    ObjectNode run = jsonMapper.createObjectNode();
+    run.put("runId", 
UUID.nameUUIDFromBytes(queryId.getBytes(StandardCharsets.UTF_8)).toString());
+
+    ObjectNode facets = jsonMapper.createObjectNode();
+
+    ObjectNode engineFacet = createFacet(ENGINE_FACET_SCHEMA_URL);
+    engineFacet.put("name", "druid");
+    engineFacet.put("version", getDruidVersion());
+    facets.set("processing_engine", engineFacet);
+
+    ObjectNode contextFacet = createFacet(CONTEXT_FACET_SCHEMA_URL);
+    contextFacet.put("queryType", queryType);
+    contextFacet.put("remoteAddress", requestLogLine.getRemoteAddr());
+    Object identity = stats.get("identity");
+    if (identity != null) {
+      contextFacet.put("identity", identity.toString());
+    }
+    // For native sub-queries of SQL, include the parent SQL query ID for 
correlation.
+    Object sqlQueryId = requestLogLine.getQuery() != null
+        ? requestLogLine.getQuery().getContext().get(BaseQuery.SQL_QUERY_ID) : 
null;
+    if (sqlQueryId != null) {
+      contextFacet.put("sqlQueryId", sqlQueryId.toString());
+    }
+    facets.set("druid_query_context", contextFacet);
+
+    ObjectNode statsFacet = createFacet(STATS_FACET_SCHEMA_URL);
+    putLongStat(statsFacet, "durationMs", stats, "sqlQuery/time", 
"query/time");
+    putLongStat(statsFacet, "bytes", stats, "sqlQuery/bytes", "query/bytes");
+    putLongStat(statsFacet, "planningTimeMs", stats, 
"sqlQuery/planningTimeMs");
+    Object statusCode = stats.get("statusCode");
+    if (statusCode != null) {
+      statsFacet.put("statusCode", statusCode.toString());
+    }
+    facets.set("druid_query_statistics", statsFacet);
+
+    if (!success) {
+      Object exception = stats.get("exception");
+      if (exception != null) {
+        ObjectNode errorFacet = createFacet(ERROR_FACET_SCHEMA_URL);
+        errorFacet.put("message", exception.toString());
+        if (sqlQueryId != null) {
+          errorFacet.put("programmingLanguage", "SQL");
+        }
+        facets.set("errorMessage", errorFacet);
+      }
+    }
+
+    run.set("facets", facets);
+    return run;
+  }
+
+  private ObjectNode buildJob(String queryId, @Nullable String sql)
+  {
+    ObjectNode job = jsonMapper.createObjectNode();
+    job.put("namespace", namespace);
+    job.put("name", queryId);
+
+    ObjectNode facets = jsonMapper.createObjectNode();
+
+    ObjectNode jobTypeFacet = createFacet(JOB_TYPE_FACET_SCHEMA_URL);
+    jobTypeFacet.put("processingType", "BATCH");
+    jobTypeFacet.put("integration", "DRUID");
+    jobTypeFacet.put("jobType", "QUERY");
+    facets.set("jobType", jobTypeFacet);
+
+    if (sql != null) {
+      ObjectNode sqlFacet = createFacet(SQL_FACET_SCHEMA_URL);
+      if (sql.length() > SQL_FACET_MAX_LENGTH) {
+        log.warn(
+            "SQL text for query [%s] exceeds [%,d] bytes and will be truncated 
in the sql job facet",
+            queryId,
+            SQL_FACET_MAX_LENGTH
+        );
+        sqlFacet.put("query", sql.substring(0, SQL_FACET_MAX_LENGTH));
+      } else {
+        sqlFacet.put("query", sql);
+      }
+      facets.set("sql", sqlFacet);
+    }
+
+    job.set("facets", facets);
+    return job;
+  }
+
+  private ObjectNode createFacet(@Nullable String schemaUrl)
+  {
+    ObjectNode facet = jsonMapper.createObjectNode();
+    facet.put("_producer", PRODUCER);
+    if (schemaUrl != null) {
+      facet.put("_schemaURL", schemaUrl);
+    }
+    return facet;
+  }
+
+  private ArrayNode buildDatasets(List<String> tableNames)
+  {
+    ArrayNode array = jsonMapper.createArrayNode();
+    for (String name : tableNames) {
+      ObjectNode node = jsonMapper.createObjectNode();
+      node.put("namespace", namespace);
+      node.put("name", name);
+      node.set("facets", jsonMapper.createObjectNode());
+      array.add(node);
+    }
+    return array;
+  }
+
+
+  protected void emit(ObjectNode event)
+  {
+    try {
+      String json = jsonMapper.writeValueAsString(event);
+      if (transportType == 
OpenLineageRequestLoggerProvider.TransportType.HTTP) {
+        emitExecutor.submit(() -> emitHttp(json));
+      } else {
+        log.debug("OpenLineage event: %s", json);
+      }
+    }
+    catch (IOException e) {
+      log.error(e, "Failed to serialize OpenLineage event");
+    }
+  }
+
+  private void emitHttp(String json)
+  {
+    IOException lastException = null;
+    int lastStatusCode = -1;
+
+    for (int attempt = 0; attempt <= MAX_SEND_RETRIES; attempt++) {
+      if (attempt > 0) {
+        try {
+          Thread.sleep(RETRY_SLEEP_MS);
+        }
+        catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+          log.warn("OpenLineage HTTP emit interrupted on retry [%d]; dropping 
event", attempt);
+          return;
+        }
+      }
+
+      HttpPost post = new HttpPost(transportUrl);
+      post.setEntity(new StringEntity(json, ContentType.APPLICATION_JSON));
+      try {
+        org.apache.http.HttpResponse response = httpClient.execute(post);
+        lastStatusCode = response.getStatusLine().getStatusCode();
+        EntityUtils.consumeQuietly(response.getEntity());
+        if (lastStatusCode >= 200 && lastStatusCode < 300) {
+          return; // success
+        }
+        // Non-2xx: retry (server-side error may be transient)
+        log.debug(
+            "OpenLineage HTTP attempt [%d/%d] received non-2xx [%d] from [%s]",
+            attempt + 1,
+            MAX_SEND_RETRIES + 1,
+            lastStatusCode,
+            transportUrl
+        );
+        lastException = null;
+      }
+      catch (IOException e) {
+        lastException = e;
+        log.debug(
+            e,
+            "OpenLineage HTTP attempt [%d/%d] failed posting to [%s]",
+            attempt + 1,
+            MAX_SEND_RETRIES + 1,
+            transportUrl
+        );
+      }
+      finally {
+        post.releaseConnection();
+      }
+    }
+
+    // All attempts exhausted — delivery guarantee is at-most-once.
+    if (lastException != null) {
+      log.warn(
+          lastException,
+          "OpenLineage event dropped: all [%d] attempts to POST to [%s] failed 
with an exception",
+          MAX_SEND_RETRIES + 1,
+          transportUrl
+      );
+    } else {
+      log.warn(
+          "OpenLineage event dropped: all [%d] attempts to POST to [%s] 
returned non-2xx status [%d]",
+          MAX_SEND_RETRIES + 1,
+          transportUrl,
+          lastStatusCode
+      );
+    }
+  }
+
+  private void putLongStat(ObjectNode node, String targetKey, Map<String, 
Object> stats, String... sourceKeys)
+  {
+    for (String key : sourceKeys) {
+      Object val = stats.get(key);
+      if (val instanceof Number) {
+        node.put(targetKey, ((Number) val).longValue());
+        return;
+      }
+    }
+  }
+
+  private static String getDruidVersion()
+  {
+    String v = 
OpenLineageRequestLogger.class.getPackage().getImplementationVersion();
+    return v != null ? v : "unknown";
+  }
+
+  /**
+   * Rejection handler that discards events when the emit queue is full, but 
logs a warning
+   * on the first drop and every {@link #DISCARD_WARNING_INTERVAL} drops 
thereafter.
+   */
+  private static class DiscardWithWarningPolicy implements 
RejectedExecutionHandler
+  {
+    private final AtomicLong discardedCount;
+
+    DiscardWithWarningPolicy(AtomicLong discardedCount)
+    {
+      this.discardedCount = discardedCount;
+    }
+
+    @Override
+    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
+    {
+      long count = discardedCount.incrementAndGet();
+      if (count == 1 || count % DISCARD_WARNING_INTERVAL == 0) {
+        log.warn("OpenLineage emit queue full, discarded [%,d] events total", 
count);
+      }
+    }
+  }
+
+}
diff --git 
a/extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLoggerModule.java
 
b/extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLoggerModule.java
new file mode 100644
index 00000000000..9f6ca4e4dd6
--- /dev/null
+++ 
b/extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLoggerModule.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.extensions.openlineage;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.inject.Binder;
+import org.apache.druid.initialization.DruidModule;
+
+import java.util.Collections;
+import java.util.List;
+
+public class OpenLineageRequestLoggerModule implements DruidModule
+{
+  @Override
+  public void configure(Binder binder)
+  {
+  }
+
+  @Override
+  public List<? extends Module> getJacksonModules()
+  {
+    return Collections.singletonList(
+        new SimpleModule("OpenLineageRequestLoggerModule")
+            .registerSubtypes(OpenLineageRequestLoggerProvider.class)
+    );
+  }
+}
diff --git 
a/extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLoggerProvider.java
 
b/extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLoggerProvider.java
new file mode 100644
index 00000000000..30555d3ff34
--- /dev/null
+++ 
b/extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLoggerProvider.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.extensions.openlineage;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.metadata.PasswordProvider;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.log.RequestLogger;
+import org.apache.druid.server.log.RequestLoggerProvider;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.impl.client.HttpClientBuilder;
+
+import javax.annotation.Nullable;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+import javax.validation.constraints.NotNull;
+import java.io.File;
+import java.io.FileInputStream;
+import java.security.KeyStore;
+import java.util.Set;
+
+/**
+ * Configure via {@code druid.request.logging.type=openlineage} in {@code 
runtime.properties}.
+ */
+@JsonTypeName("openlineage")
+public class OpenLineageRequestLoggerProvider implements RequestLoggerProvider
+{
+  private static final Logger log = new 
Logger(OpenLineageRequestLoggerProvider.class);
+
+  public enum TransportType
+  {
+    CONSOLE,
+    HTTP
+  }
+
+  @JacksonInject
+  @Json
+  @NotNull
+  private ObjectMapper jsonMapper;
+
+  @JsonProperty
+  @NotNull
+  private String namespace = "druid://" + DruidNode.getDefaultHost();
+
+  @JsonProperty
+  @NotNull
+  private TransportType transportType = TransportType.CONSOLE;
+
+  @Nullable
+  @JsonProperty
+  private String transportUrl;
+
+  @JsonProperty
+  @NotNull
+  private Set<String> excludedNativeQueryTypes = Set.of(
+      "segmentMetadata",
+      "dataSourceMetadata",
+      "timeBoundary"
+  );
+
+  @JsonProperty
+  private int emitQueueCapacity = 
OpenLineageRequestLogger.DEFAULT_EMIT_QUEUE_CAPACITY;
+
+  @JsonProperty
+  private int emitThreadCount = 
OpenLineageRequestLogger.DEFAULT_EMIT_THREAD_COUNT;
+
+  @Nullable
+  @JsonProperty
+  private String trustStorePath;
+
+  @Nullable
+  @JsonProperty
+  private PasswordProvider trustStorePassword;
+
+  @Nullable
+  @JsonProperty
+  private String keyStorePath;
+
+  @Nullable
+  @JsonProperty
+  private PasswordProvider keyStorePassword;
+
+  @Override
+  public RequestLogger get()
+  {
+    log.debug("Creating OpenLineageRequestLogger [namespace=%s, 
transport=%s]", namespace, transportType);
+    HttpClient httpClient = transportType == TransportType.HTTP ? 
buildHttpClient() : null;
+    return new OpenLineageRequestLogger(
+        jsonMapper,
+        namespace,
+        transportType,
+        transportUrl,
+        excludedNativeQueryTypes,
+        emitQueueCapacity,
+        emitThreadCount,
+        httpClient
+    );
+  }
+
+  private HttpClient buildHttpClient()
+  {
+    RequestConfig requestConfig = RequestConfig.custom()
+                                               .setConnectTimeout(5000)
+                                               .setSocketTimeout(10000)
+                                               
.setConnectionRequestTimeout(5000)
+                                               .build();
+    if (trustStorePath == null && keyStorePath == null) {
+      return 
HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build();
+    }
+    try {
+      HttpClientBuilder builder = 
HttpClientBuilder.create().setDefaultRequestConfig(requestConfig);
+      TrustManagerFactory tmf = null;
+      if (trustStorePath != null) {
+        try (FileInputStream in = new FileInputStream(new 
File(trustStorePath))) {
+          KeyStore trustStore = 
KeyStore.getInstance(KeyStore.getDefaultType());
+          // getPassword() may return null if the env var is unset; treat as 
no-password.
+          String rawTrustPw = trustStorePassword != null ? 
trustStorePassword.getPassword() : null;
+          trustStore.load(in, rawTrustPw != null ? rawTrustPw.toCharArray() : 
null);
+          tmf = 
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+          tmf.init(trustStore);
+        }
+      }
+      KeyManagerFactory kmf = null;
+      if (keyStorePath != null) {
+        try (FileInputStream in = new FileInputStream(new File(keyStorePath))) 
{
+          KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
+          // Resolve once to avoid inconsistent values if the provider is 
dynamic.
+          String rawKeyPw = keyStorePassword != null ? 
keyStorePassword.getPassword() : null;
+          char[] keyPwChars = rawKeyPw != null ? rawKeyPw.toCharArray() : null;
+          keyStore.load(in, keyPwChars);
+          kmf = 
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+          kmf.init(keyStore, keyPwChars);
+        }
+      }
+      SSLContext sslContext = SSLContext.getInstance("TLS");
+      sslContext.init(
+          kmf != null ? kmf.getKeyManagers() : null,
+          tmf != null ? tmf.getTrustManagers() : null,
+          null
+      );
+      return builder.setSSLContext(sslContext).build();
+    }
+    catch (Exception e) {
+      throw new IllegalStateException("Failed to configure TLS for OpenLineage 
HTTP transport", e);
+    }
+  }
+}
diff --git 
a/extensions-contrib/openlineage-emitter/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
 
b/extensions-contrib/openlineage-emitter/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
new file mode 100644
index 00000000000..6376246f66a
--- /dev/null
+++ 
b/extensions-contrib/openlineage-emitter/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.druid.extensions.openlineage.OpenLineageRequestLoggerModule
diff --git 
a/extensions-contrib/openlineage-emitter/src/main/resources/openlineage-schema/DruidQueryContextRunFacet.json
 
b/extensions-contrib/openlineage-emitter/src/main/resources/openlineage-schema/DruidQueryContextRunFacet.json
new file mode 100644
index 00000000000..83a6b34ac0d
--- /dev/null
+++ 
b/extensions-contrib/openlineage-emitter/src/main/resources/openlineage-schema/DruidQueryContextRunFacet.json
@@ -0,0 +1,29 @@
+{
+  "$schema": "https://json-schema.org/draft/2020-12/schema";,
+  "$id": 
"https://raw.githubusercontent.com/apache/druid/master/extensions-contrib/openlineage-emitter/src/main/resources/openlineage-schema/DruidQueryContextRunFacet.json";,
+  "title": "DruidQueryContextRunFacet",
+  "description": "Druid-specific run facet capturing query context metadata.",
+  "type": "object",
+  "allOf": [
+    {"$ref": 
"https://openlineage.io/spec/2-0-2/OpenLineage.json#/definitions/RunFacet"}
+  ],
+  "properties": {
+    "queryType": {
+      "type": "string",
+      "description": "The Druid query type (e.g. 'groupBy', 'topN', 'scan', 
'msq')."
+    },
+    "remoteAddress": {
+      "type": "string",
+      "description": "IP address of the client that submitted the query."
+    },
+    "identity": {
+      "type": "string",
+      "description": "Authenticated identity of the query submitter, if 
available."
+    },
+    "sqlQueryId": {
+      "type": "string",
+      "description": "For native sub-queries of SQL, the parent SQL query ID 
for correlation."
+    }
+  },
+  "required": ["queryType", "remoteAddress"]
+}
diff --git 
a/extensions-contrib/openlineage-emitter/src/main/resources/openlineage-schema/DruidQueryStatisticsRunFacet.json
 
b/extensions-contrib/openlineage-emitter/src/main/resources/openlineage-schema/DruidQueryStatisticsRunFacet.json
new file mode 100644
index 00000000000..b9b1e297614
--- /dev/null
+++ 
b/extensions-contrib/openlineage-emitter/src/main/resources/openlineage-schema/DruidQueryStatisticsRunFacet.json
@@ -0,0 +1,28 @@
+{
+  "$schema": "https://json-schema.org/draft/2020-12/schema";,
+  "$id": 
"https://raw.githubusercontent.com/apache/druid/master/extensions-contrib/openlineage-emitter/src/main/resources/openlineage-schema/DruidQueryStatisticsRunFacet.json";,
+  "title": "DruidQueryStatisticsRunFacet",
+  "description": "Druid-specific run facet capturing query execution 
statistics.",
+  "type": "object",
+  "allOf": [
+    {"$ref": 
"https://openlineage.io/spec/2-0-2/OpenLineage.json#/definitions/RunFacet"}
+  ],
+  "properties": {
+    "durationMs": {
+      "type": "integer",
+      "description": "Total query execution time in milliseconds."
+    },
+    "bytes": {
+      "type": "integer",
+      "description": "Number of bytes returned by the query."
+    },
+    "planningTimeMs": {
+      "type": "integer",
+      "description": "SQL planning time in milliseconds. Only present on SQL 
queries."
+    },
+    "statusCode": {
+      "type": "string",
+      "description": "HTTP status code of the query response."
+    }
+  }
+}
diff --git 
a/extensions-contrib/openlineage-emitter/src/test/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLoggerTest.java
 
b/extensions-contrib/openlineage-emitter/src/test/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLoggerTest.java
new file mode 100644
index 00000000000..0a5c2d1bd57
--- /dev/null
+++ 
b/extensions-contrib/openlineage-emitter/src/test/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLoggerTest.java
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.extensions.openlineage;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.UnionDataSource;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.query.spec.QuerySegmentSpec;
+import org.apache.druid.server.QueryStats;
+import org.apache.druid.server.RequestLogLine;
+import org.joda.time.DateTime;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+
+public class OpenLineageRequestLoggerTest
+{
+  private static final ObjectMapper MAPPER = new DefaultObjectMapper();
+  private static final String NAMESPACE = "druid://test:8082";
+  private static final DateTime TIMESTAMP = 
DateTimes.of("2024-01-01T00:00:00Z");
+  private static final String REMOTE_ADDR = "10.0.0.1";
+  private static final Set<String> DEFAULT_EXCLUDED_NATIVE_QUERY_TYPES = 
Set.of(
+      "segmentMetadata",
+      "dataSourceMetadata",
+      "timeBoundary"
+  );
+
+  private List<ObjectNode> capturedEvents;
+  private OpenLineageRequestLogger logger;
+
+  @BeforeEach
+  public void setUp()
+  {
+    capturedEvents = new ArrayList<>();
+    logger = createLogger(DEFAULT_EXCLUDED_NATIVE_QUERY_TYPES);
+  }
+
+  private OpenLineageRequestLogger createLogger(Set<String> 
excludedNativeQueryTypes)
+  {
+    return new OpenLineageRequestLogger(
+        MAPPER,
+        NAMESPACE,
+        OpenLineageRequestLoggerProvider.TransportType.CONSOLE,
+        null,
+        excludedNativeQueryTypes
+    )
+    {
+      @Override
+      protected void emit(ObjectNode event)
+      {
+        capturedEvents.add(event);
+      }
+    };
+  }
+
+  // --- logSqlQuery: SELECT is no-op; MSQ DML emits ---
+
+  @Test
+  public void testSqlSelectIsNoOp() throws IOException
+  {
+    logger.logSqlQuery(sqlLine(
+        "SELECT * FROM \"kttm\"",
+        ImmutableMap.of("sqlQueryId", "qid-1"),
+        ImmutableMap.of("success", true)
+    ));
+
+    Assertions.assertEquals(0, capturedEvents.size());
+  }
+
+  @Test
+  public void testLogSqlQueryWithNullSql() throws IOException
+  {
+    logger.logSqlQuery(RequestLogLine.forSql(
+        null,
+        ImmutableMap.of(),
+        TIMESTAMP,
+        REMOTE_ADDR,
+        new QueryStats(ImmutableMap.of())
+    ));
+
+    Assertions.assertEquals(0, capturedEvents.size());
+  }
+
+  @Test
+  public void testMsqInsertMissingSqlQueryId() throws IOException
+  {
+    // No sqlQueryId in context → falls back to UNKNOWN_QUERY_ID
+    logger.logSqlQuery(sqlLine(
+        "INSERT INTO \"kttm-result\" SELECT * FROM \"kttm\"",
+        ImmutableMap.of(),
+        ImmutableMap.of("success", true)
+    ));
+
+    Assertions.assertEquals(1, capturedEvents.size());
+    Assertions.assertEquals(
+        OpenLineageRequestLogger.UNKNOWN_QUERY_ID,
+        capturedEvents.get(0).get("job").get("name").asText()
+    );
+  }
+
+  @Test
+  public void testMsqInsertNullContext() throws IOException
+  {
+    // Null sqlQueryContext → falls back to UNKNOWN_QUERY_ID
+    logger.logSqlQuery(RequestLogLine.forSql(
+        "INSERT INTO \"kttm-result\" SELECT * FROM \"kttm\"",
+        null,
+        TIMESTAMP,
+        REMOTE_ADDR,
+        new QueryStats(ImmutableMap.of("success", true))
+    ));
+
+    Assertions.assertEquals(1, capturedEvents.size());
+    Assertions.assertEquals(
+        OpenLineageRequestLogger.UNKNOWN_QUERY_ID,
+        capturedEvents.get(0).get("job").get("name").asText()
+    );
+  }
+
+  @Test
+  public void testMsqInsertEmitsOutputLineage() throws IOException
+  {
+    logger.logSqlQuery(sqlLine(
+        "INSERT INTO \"kttm-result\" SELECT * FROM \"kttm\"",
+        ImmutableMap.of("sqlQueryId", "msq-insert-1"),
+        ImmutableMap.of("success", true, "sqlQuery/time", 1200L, 
"sqlQuery/bytes", 4096L)
+    ));
+
+    Assertions.assertEquals(1, capturedEvents.size());
+    ObjectNode event = capturedEvents.get(0);
+
+    Assertions.assertEquals("COMPLETE", event.get("eventType").asText());
+    // Output table extracted from SQL
+    Assertions.assertEquals(1, event.get("outputs").size());
+    Assertions.assertEquals("kttm-result", 
event.get("outputs").get(0).get("name").asText());
+    // Inputs are empty — extracting FROM clause requires a full SQL parser
+    Assertions.assertEquals(0, event.get("inputs").size());
+    // queryType is msq
+    Assertions.assertEquals("msq", 
event.get("run").get("facets").get("druid_query_context").get("queryType").asText());
+    // sql facet present with the original SQL text
+    Assertions.assertEquals(
+        "INSERT INTO \"kttm-result\" SELECT * FROM \"kttm\"",
+        event.get("job").get("facets").get("sql").get("query").asText()
+    );
+    // stats
+    Assertions.assertEquals(1200L, 
event.get("run").get("facets").get("druid_query_statistics").get("durationMs").asLong());
+  }
+
+  @Test
+  public void testMsqReplaceEmitsOutputLineage() throws IOException
+  {
+    logger.logSqlQuery(sqlLine(
+        "REPLACE INTO \"kttm-result\" OVERWRITE ALL SELECT * FROM \"kttm\"",
+        ImmutableMap.of("sqlQueryId", "msq-replace-1"),
+        ImmutableMap.of("success", true)
+    ));
+
+    Assertions.assertEquals(1, capturedEvents.size());
+    Assertions.assertEquals("kttm-result", 
capturedEvents.get(0).get("outputs").get(0).get("name").asText());
+  }
+
+  @Test
+  public void testMsqInsertUnquotedTable() throws IOException
+  {
+    logger.logSqlQuery(sqlLine(
+        "INSERT INTO kttm_result SELECT count(*) FROM kttm",
+        ImmutableMap.of("sqlQueryId", "msq-unquoted-1"),
+        ImmutableMap.of("success", true)
+    ));
+
+    Assertions.assertEquals(1, capturedEvents.size());
+    Assertions.assertEquals("kttm_result", 
capturedEvents.get(0).get("outputs").get(0).get("name").asText());
+  }
+
+  @Test
+  public void testMsqInsertDruidSchemaPrefix() throws IOException
+  {
+    // Druid normalizes "druid.foo" → "foo" — our AST extraction should match 
the planner.
+    logger.logSqlQuery(sqlLine(
+        "INSERT INTO druid.kttm_result SELECT * FROM kttm",
+        ImmutableMap.of("sqlQueryId", "msq-druid-schema-1"),
+        ImmutableMap.of("success", true)
+    ));
+
+    Assertions.assertEquals(1, capturedEvents.size());
+    Assertions.assertEquals("kttm_result", 
capturedEvents.get(0).get("outputs").get(0).get("name").asText());
+  }
+
+  @Test
+  public void testMsqInsertCteUnwrapped() throws IOException
+  {
+    // In Druid MSQ, CTEs appear as the source of the INSERT, not as a wrapper 
around it.
+    // The target table must still be extracted correctly from the outer 
SqlInsert node.
+    logger.logSqlQuery(sqlLine(
+        "INSERT INTO kttm_result WITH staged AS (SELECT * FROM kttm) SELECT * 
FROM staged",
+        ImmutableMap.of("sqlQueryId", "msq-cte-1"),
+        ImmutableMap.of("success", true)
+    ));
+
+    Assertions.assertEquals(1, capturedEvents.size());
+    Assertions.assertEquals("kttm_result", 
capturedEvents.get(0).get("outputs").get(0).get("name").asText());
+  }
+
+  @Test
+  public void testMsqInsertThreePartCatalogPrefix() throws IOException
+  {
+    // catalog.druid.foo → Druid normalizes to bare name "foo"; AST extraction 
matches.
+    logger.logSqlQuery(sqlLine(
+        "INSERT INTO catalog.druid.kttm_result SELECT * FROM kttm",
+        ImmutableMap.of("sqlQueryId", "msq-catalog-1"),
+        ImmutableMap.of("success", true)
+    ));
+
+    Assertions.assertEquals(1, capturedEvents.size());
+    Assertions.assertEquals("kttm_result", 
capturedEvents.get(0).get("outputs").get(0).get("name").asText());
+  }
+
+  @Test
+  public void testMsqInsertWithSetPreamble() throws IOException
+  {
+    // Druid SQL accepts SET statements before DML. We call DruidSqlParser with
+    // allowSetStatements=true; the SET is dropped and only the main INSERT is 
inspected.
+    logger.logSqlQuery(sqlLine(
+        "SET maxNumTasks = 2;\nINSERT INTO kttm_result SELECT * FROM kttm",
+        ImmutableMap.of("sqlQueryId", "msq-set-1"),
+        ImmutableMap.of("success", true)
+    ));
+
+    Assertions.assertEquals(1, capturedEvents.size());
+    Assertions.assertEquals("kttm_result", 
capturedEvents.get(0).get("outputs").get(0).get("name").asText());
+  }
+
+  @Test
+  public void testMsqInsertExternExportSkipped() throws IOException
+  {
+    // INSERT INTO EXTERN(...) AS CSV writes to a file, not a Druid datasource.
+    // The AST target is a SqlCall, not a SqlIdentifier; emit no event at all.
+    logger.logSqlQuery(sqlLine(
+        "INSERT INTO EXTERN(s3(bucket => 'x', prefix => 'y')) AS CSV SELECT * 
FROM kttm",
+        ImmutableMap.of("sqlQueryId", "msq-extern-1"),
+        ImmutableMap.of("success", true)
+    ));
+
+    Assertions.assertEquals(0, capturedEvents.size(), "EXTERN exports should 
not emit an output dataset");
+  }
+
+  @Test
+  public void testMsqInsertFailure() throws IOException
+  {
+    logger.logSqlQuery(sqlLine(
+        "INSERT INTO \"kttm-result\" SELECT * FROM \"kttm\"",
+        ImmutableMap.of("sqlQueryId", "msq-fail-1"),
+        ImmutableMap.of("success", false, "exception", "Task failed")
+    ));
+
+    Assertions.assertEquals(1, capturedEvents.size());
+    ObjectNode event = capturedEvents.get(0);
+    Assertions.assertEquals("FAIL", event.get("eventType").asText());
+    Assertions.assertEquals("Task failed", 
event.get("run").get("facets").get("errorMessage").get("message").asText());
+  }
+
+  // --- Native query tests ---
+
+  @Test
+  public void testNativeQuery() throws IOException
+  {
+    TestQuery query = new TestQuery(
+        new TableDataSource("myDatasource"),
+        ImmutableMap.of("queryId", "native-qid-1")
+    );
+    logger.logNativeQuery(nativeLine(query, ImmutableMap.of("success", true, 
"query/time", 50L, "query/bytes", 512L)));
+
+    Assertions.assertEquals(1, capturedEvents.size());
+    ObjectNode event = capturedEvents.get(0);
+
+    Assertions.assertEquals("COMPLETE", event.get("eventType").asText());
+    Assertions.assertTrue(inputNames(event).contains("myDatasource"));
+
+    JsonNode stats = 
event.get("run").get("facets").get("druid_query_statistics");
+    Assertions.assertEquals(50L, stats.get("durationMs").asLong());
+    Assertions.assertEquals(512L, stats.get("bytes").asLong());
+
+    // No sql facet on native queries
+    Assertions.assertNull(event.get("job").get("facets").get("sql"));
+
+    // Query type reflects the native query type
+    Assertions.assertEquals("test", 
event.get("run").get("facets").get("druid_query_context").get("queryType").asText());
+  }
+
+  @Test
+  public void testNativeSubQueryOfSqlEmits() throws IOException
+  {
+    // Native sub-queries of SQL emit events with the plain native queryType.
+    // SQL origin is indicated by sqlQueryId in the context facet.
+    TestQuery query = new TestQuery(
+        new TableDataSource("myDatasource"),
+        ImmutableMap.of("queryId", "native-qid-2", BaseQuery.SQL_QUERY_ID, 
"parent-sql-id")
+    );
+    logger.logNativeQuery(nativeLine(query, ImmutableMap.of("success", true)));
+
+    Assertions.assertEquals(1, capturedEvents.size());
+    ObjectNode event = capturedEvents.get(0);
+
+    // queryType is the plain native type, not prefixed with "sql/"
+    Assertions.assertEquals("test",
+        
event.get("run").get("facets").get("druid_query_context").get("queryType").asText());
+
+    // sqlQueryId included for correlation with parent SQL
+    Assertions.assertEquals("parent-sql-id",
+        
event.get("run").get("facets").get("druid_query_context").get("sqlQueryId").asText());
+
+    Assertions.assertTrue(inputNames(event).contains("myDatasource"));
+  }
+
+  @Test
+  public void testNativeQueryWithNullQueryDoesNotCrash() throws IOException
+  {
+    RequestLogLine logLine = RequestLogLine.forNative(
+        null,
+        TIMESTAMP,
+        REMOTE_ADDR,
+        new QueryStats(ImmutableMap.of())
+    );
+
+    logger.logNativeQuery(logLine);
+
+    Assertions.assertEquals(0, capturedEvents.size());
+  }
+
+  @Test
+  public void testNativeQueryExcludedType() throws IOException
+  {
+    TestQuery query = new TestQuery(
+        new TableDataSource("myDatasource"),
+        ImmutableMap.of("queryId", "native-qid-excluded")
+    )
+    {
+      @Override
+      public String getType()
+      {
+        return "segmentMetadata";
+      }
+    };
+    logger.logNativeQuery(nativeLine(query, ImmutableMap.of("success", true)));
+
+    Assertions.assertEquals(0, capturedEvents.size());
+  }
+
+  @Test
+  public void testNativeUnionDataSourceExtractsBothTables() throws IOException
+  {
+    // UnionDataSource.getTableNames() returns all member table names.
+    // JoinDataSource has the same behavior but requires complex construction 
(ExprMacroTable etc.),
+    // so we test UnionDataSource as the multi-table representative.
+    DataSource unionDs = new UnionDataSource(
+        List.of(new TableDataSource("leftTable"), new 
TableDataSource("rightTable"))
+    );
+    TestQuery query = new TestQuery(unionDs, ImmutableMap.of("queryId", 
"native-multi-1"));
+    logger.logNativeQuery(nativeLine(query, ImmutableMap.of("success", true)));
+
+    Assertions.assertEquals(1, capturedEvents.size());
+    Set<String> names = inputNames(capturedEvents.get(0));
+    Assertions.assertEquals(2, names.size());
+    Assertions.assertTrue(names.contains("leftTable"));
+    Assertions.assertTrue(names.contains("rightTable"));
+  }
+
+  @Test
+  public void testNativeQueryFacets() throws IOException
+  {
+    TestQuery query = new TestQuery(
+        new TableDataSource("t"),
+        ImmutableMap.of("queryId", "native-facets")
+    );
+    logger.logNativeQuery(nativeLine(query, ImmutableMap.of(
+        "success", true,
+        "query/time", 300L,
+        "query/bytes", 2048L,
+        "statusCode", "200",
+        "identity", "bob"
+    )));
+
+    Assertions.assertEquals(1, capturedEvents.size());
+    ObjectNode event = capturedEvents.get(0);
+
+    // OpenLineage envelope
+    Assertions.assertNotNull(event.get("schemaURL"));
+    Assertions.assertNotNull(event.get("producer"));
+    Assertions.assertEquals("2024-01-01T00:00:00.000Z", 
event.get("eventTime").asText());
+
+    // Job facets — no SQL facet
+    JsonNode jobFacets = event.get("job").get("facets");
+    Assertions.assertNull(jobFacets.get("sql"));
+    Assertions.assertEquals("BATCH", 
jobFacets.get("jobType").get("processingType").asText());
+    Assertions.assertEquals("DRUID", 
jobFacets.get("jobType").get("integration").asText());
+    Assertions.assertEquals("QUERY", 
jobFacets.get("jobType").get("jobType").asText());
+
+    // Run facets
+    JsonNode runFacets = event.get("run").get("facets");
+    Assertions.assertEquals("druid", 
runFacets.get("processing_engine").get("name").asText());
+
+    // Query context facet
+    JsonNode ctx = runFacets.get("druid_query_context");
+    Assertions.assertEquals("bob", ctx.get("identity").asText());
+    Assertions.assertEquals(REMOTE_ADDR, ctx.get("remoteAddress").asText());
+    Assertions.assertEquals("test", ctx.get("queryType").asText());
+    Assertions.assertNull(ctx.get("sqlQueryId"));
+
+    // Statistics facet
+    JsonNode stats = runFacets.get("druid_query_statistics");
+    Assertions.assertEquals(300L, stats.get("durationMs").asLong());
+    Assertions.assertEquals(2048L, stats.get("bytes").asLong());
+    Assertions.assertEquals("200", stats.get("statusCode").asText());
+
+    // No error facet on success
+    Assertions.assertNull(runFacets.get("errorMessage"));
+  }
+
+  @Test
+  public void testNativeQueryFailure() throws IOException
+  {
+    TestQuery query = new TestQuery(
+        new TableDataSource("t"),
+        ImmutableMap.of("queryId", "native-fail")
+    );
+    logger.logNativeQuery(nativeLine(query, ImmutableMap.of(
+        "success", false,
+        "exception", "Query timed out"
+    )));
+
+    Assertions.assertEquals(1, capturedEvents.size());
+    ObjectNode event = capturedEvents.get(0);
+
+    Assertions.assertEquals("FAIL", event.get("eventType").asText());
+
+    JsonNode errorFacet = event.get("run").get("facets").get("errorMessage");
+    Assertions.assertNotNull(errorFacet);
+    Assertions.assertEquals("Query timed out", 
errorFacet.get("message").asText());
+    // Native query — no "programmingLanguage" field
+    Assertions.assertNull(errorFacet.get("programmingLanguage"));
+  }
+
+  @Test
+  public void testNativeSqlSubQueryFailure() throws IOException
+  {
+    TestQuery query = new TestQuery(
+        new TableDataSource("t"),
+        ImmutableMap.of("queryId", "native-sql-fail", BaseQuery.SQL_QUERY_ID, 
"parent-sql")
+    );
+    logger.logNativeQuery(nativeLine(query, ImmutableMap.of(
+        "success", false,
+        "exception", "Query timed out"
+    )));
+
+    Assertions.assertEquals(1, capturedEvents.size());
+    ObjectNode event = capturedEvents.get(0);
+
+    Assertions.assertEquals("FAIL", event.get("eventType").asText());
+
+    JsonNode errorFacet = event.get("run").get("facets").get("errorMessage");
+    Assertions.assertNotNull(errorFacet);
+    // SQL-originated query — has "programmingLanguage"
+    Assertions.assertEquals("SQL", 
errorFacet.get("programmingLanguage").asText());
+  }
+
+  @Test
+  public void testConstructorThrowsWhenHttpWithoutUrl()
+  {
+    Assertions.assertThrows(IllegalStateException.class, () -> new 
OpenLineageRequestLogger(
+        MAPPER,
+        NAMESPACE,
+        OpenLineageRequestLoggerProvider.TransportType.HTTP,
+        null,
+        DEFAULT_EXCLUDED_NATIVE_QUERY_TYPES
+    ));
+  }
+
+  @Test
+  public void testStopWithHttpTransport()
+  {
+    OpenLineageRequestLogger httpLogger = new OpenLineageRequestLogger(
+        MAPPER,
+        NAMESPACE,
+        OpenLineageRequestLoggerProvider.TransportType.HTTP,
+        "http://localhost:9999/api/v1/lineage";,
+        DEFAULT_EXCLUDED_NATIVE_QUERY_TYPES
+    );
+    // Covers the executor-shutdown and httpClient-close branches in stop()
+    Assertions.assertDoesNotThrow(httpLogger::stop);
+  }
+
+  private static RequestLogLine sqlLine(String sql, Map<String, Object> 
context, Map<String, Object> stats)
+  {
+    return RequestLogLine.forSql(sql, context, TIMESTAMP, REMOTE_ADDR, new 
QueryStats(stats));
+  }
+
+  private static RequestLogLine nativeLine(Query<?> query, Map<String, Object> 
stats)
+  {
+    return RequestLogLine.forNative(query, TIMESTAMP, REMOTE_ADDR, new 
QueryStats(stats));
+  }
+
+  private static Set<String> inputNames(JsonNode event)
+  {
+    Set<String> names = new HashSet<>();
+    for (JsonNode input : event.get("inputs")) {
+      names.add(input.get("name").asText());
+    }
+    return names;
+  }
+}
+
+@JsonTypeName("test")
+class TestQuery extends BaseQuery<Object>
+{
+  private static final QuerySegmentSpec DUMMY_SPEC = new 
MultipleIntervalSegmentSpec(
+      Collections.singletonList(Intervals.of("2024-01-01/2024-01-02"))
+  );
+
+  TestQuery(DataSource dataSource, Map<String, Object> context)
+  {
+    super(dataSource, DUMMY_SPEC, context);
+  }
+
+  @Override
+  public boolean hasFilters()
+  {
+    return false;
+  }
+
+  @Override
+  public DimFilter getFilter()
+  {
+    return null;
+  }
+
+  @Override
+  public String getType()
+  {
+    return "test";
+  }
+
+  @Override
+  public Query<Object> withQuerySegmentSpec(QuerySegmentSpec spec)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Query<Object> withDataSource(DataSource dataSource)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Query<Object> withOverriddenContext(Map<String, Object> 
contextOverride)
+  {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git a/pom.xml b/pom.xml
index bab799d96ad..9827f232c19 100644
--- a/pom.xml
+++ b/pom.xml
@@ -254,6 +254,7 @@
         <module>extensions-contrib/ambari-metrics-emitter</module>
         <module>extensions-contrib/sqlserver-metadata-storage</module>
         <module>extensions-contrib/kafka-emitter</module>
+        <module>extensions-contrib/openlineage-emitter</module>
         <module>extensions-contrib/redis-cache</module>
         <module>extensions-contrib/opentsdb-emitter</module>
         <module>extensions-contrib/momentsketch</module>
diff --git a/website/.spelling b/website/.spelling
index c85463563e1..7452721a172 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -62,6 +62,7 @@ CORS
 CNF
 CPUs
 CSVs
+CTEs
 CentralizedDatasourceSchema
 Ceph
 circledR
@@ -105,6 +106,7 @@ ECS
 EMR
 EMRFS
 ETL
+emits
 Elasticsearch
 Enums
 FIRST_VALUE
@@ -196,6 +198,7 @@ Lucene
 MapBD
 MapDB
 MariaDB
+Marquez
 MiddleManager
 MiddleManagers
 Montréal
@@ -211,6 +214,8 @@ OLAP
 OOMs
 OpenJDK
 OpenLDAP
+openlineage
+OpenLineage
 OpenTSDB
 OutputStream
 ParAccel
@@ -333,6 +338,7 @@ deepstore
 denormalization
 denormalize
 denormalized
+deduplicated
 deprioritization
 deprioritizes
 dequeued
@@ -416,6 +422,7 @@ kubexit
 k8s
 laning
 lifecycle
+lineage
 localhost
 log4j
 log4j2


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to