[
https://issues.apache.org/jira/browse/SPARK-54071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yang Guo updated SPARK-54071:
-----------------------------
Description:
# OpenLineage Spark Integration: File Sink Missing Output Information in Spark
Structured Streaming Mode
## OpenLineage version
`io.openlineage:openlineage-spark_2.13:1.39.0`
## Technology and package versions
**Python:** 3.13.3
**Scala:** 2.13.16
**Java:** OpenJDK 64-Bit Server VM, 17.0.16
**pip freeze:**
```
py4j==0.10.9.9
pyspark==4.0.1
```
## Environment configuration
I used the default config for openlineage and submit native spark on local
machine. There is no managed services involved.
```bash
$ git clone https://github.com/MarquezProject/marquez.git && cd marquez
$ ./docker/up.sh
```
## Problem details
When using Spark structured streaming to write parquet file to file systems:
- **File sink** will only generate openlineage event with streaming processing
type with output information as **empty**.
- **Foreachbatch sink** will generate openlineage event with both streaming
processing type and batch processing type. The batch processing type will have
**valid output information**.
**The bug:** The openlineage spark integration cannot generate openlineage
event with valid output details for File sink in Spark structured streaming.
## File sink
### Sample code:
```python
query = streaming_df.writeStream \
.format('parquet') \
.outputMode('append') \
.partitionBy('year', 'month', 'day') \
.option('checkpointLocation', checkpoint_path) \
.option('path', output_path) \
.queryName('filesink') \
.start()
```
### Sample event for "processingType":"STREAMING"
```json
{
"eventTime": "2025-10-28T13:45:34.282Z",
"producer":
"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark",
"schemaURL":
"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
"eventType": "COMPLETE",
"run": {
"runId": "019a2b14-4e9d-7574-95a9-55182f07591d",
"facets": {
"parent": {
"_producer":
"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark",
"_schemaURL":
"https://openlineage.io/spec/facets/1-1-0/ParentRunFacet.json#/$defs/ParentRunFacet",
"run": {"runId": "019a2b10-5218-7ba3-9cf4-1ce4b1800752"},
"job": {"namespace": "spark_namespace", "name": "filesink"},
"root": {
"run": {"runId": "019a2b10-5218-7ba3-9cf4-1ce4b1800752"},
"job": {"namespace": "airflow_namespace", "name":
"airflow_dag.airflow_task"}
}
}
}
},
was:
### OpenLineage version
`io.openlineage:openlineage-spark_2.13:1.39.0`
### Technology and package versions
Python: 3.13.3
Scala: 2.13.16
Java: OpenJDK 64-Bit Server VM, 17.0.16
`pip freeze`
py4j==0.10.9.9
pyspark==4.0.1
### Environment configuration
I used the default config for openlineage and submit native spark on local
machine. There is no managed services involved.
```
$ git clone https://github.com/MarquezProject/marquez.git && cd marquez
$ ./docker/up.sh
```
### Problem details
When using Spark structured streaming to write parquet file to file systems,
- File sink will only generate openlineage event with streaming processing type
with output information as empty.
- Foreachbatch sink will generate openlineage event with both streaming
processing type and batch processing type. The batch processing type will have
valid output information.
The bug is that the openlineage spark integration can not generate openlineage
event with valid output details for File sink in Spark structured streaming.
More details about the sample code and sample events are following.
### File sink:
#### Sample code:
```
query = streaming_df.writeStream \
.format('parquet') \
.outputMode('append') \
.partitionBy('year', 'month', 'day') \
.option('checkpointLocation', checkpoint_path) \
.option('path', output_path) \
.queryName('filesink') \
.start()
```
#### Sample event for "processingType":"STREAMING"
```
25/10/29 00:49:02 DEBUG wire: http-outgoing-52 >>
"\{"eventTime":"2025-10-28T13:45:34.282Z","producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent","eventType":"COMPLETE","run":{"runId":"019a2b14-4e9d-7574-95a9-55182f07591d","facets":{"parent":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-0/ParentRunFacet.json#/$defs/ParentRunFacet","run":{"runId":"019a2b10-5218-7ba3-9cf4-1ce4b1800752"},"job":\{"namespace":"spark_namespace","name":"filesink"},"root":\{"run":{"runId":"019a2b10-5218-7ba3-9cf4-1ce4b1800752"},"job":\{"namespace":"airflow_namespace","name":"airflow_dag.airflow_task"}}},"spark_properties":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","properties":{"spark.master":"local[*]","spark.app.name":"filesink"}},"processing_engine":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet","version":"4.0.1","name":"spark","openlineageAdapterVersion":"1.39.0"},"environment-properties":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","environment-properties":{}}}},"job":\{"namespace":"spark_namespace","name":"filesink.project","facets":{"jobType":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet","processingType":"STREAMING","integration":"SPARK","jobType":"SQL_JOB"}}},"inputs":[],"outputs":[]}"
```
### foreachbatch sink
#### sample code
```
def write_to_file(batch_df, batch_id):
if batch_df.count() > 0:
batch_df.write \
.mode("append") \
.partitionBy("year", "month", "day") \
.parquet(output_path)
query = streaming_df \
.writeStream \
.outputMode("append") \
.foreachBatch(write_to_file) \
.option("checkpointLocation", checkpoint_path) \
.trigger(processingTime='10 seconds') \
.start()
```
The above code with generate both streaming and batch processing type event.
#### Sample streaming type event
```
25/10/29 01:04:45 DEBUG wire: http-outgoing-1 >>
"\{"eventTime":"2025-10-28T14:04:43.373Z","producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent","eventType":"COMPLETE","run":{"runId":"019a2b22-b364-79ca-be87-2d173c25c16c","facets":{"parent":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-0/ParentRunFacet.json#/$defs/ParentRunFacet","run":{"runId":"019a2b22-9e86-770e-8af9-f9136ec6594c"},"job":\{"namespace":"spark_namespace","name":"foreachbatchsink"},"root":\{"run":{"runId":"019a2b22-9e86-770e-8af9-f9136ec6594c"},"job":\{"namespace":"airflow_namespace","name":"airflow_dag.airflow_task"}}},"spark_properties":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","properties":{"spark.master":"local[*]","spark.app.name":"foreachbatchsink"}},"processing_engine":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet","version":"4.0.1","name":"spark","openlineageAdapterVersion":"1.39.0"},"environment-properties":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","environment-properties":{}}}},"job":\{"namespace":"spark_namespace","name":"foreachbatchsink.project","facets":{"jobType":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet","processingType":"STREAMING","integration":"SPARK","jobType":"SQL_JOB"}}},"inputs":[],"outputs":[]}"
```
#### Sample batch type event
```
25/10/29 01:07:26 DEBUG wire: http-outgoing-33 >>
"\{"eventTime":"2025-10-28T14:07:20.711Z","producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent","eventType":"COMPLETE","run":{"runId":"019a2b25-28f6-7fa0-a4e2-2aaba4f61d7e","facets":{"parent":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-0/ParentRunFacet.json#/$defs/ParentRunFacet","run":{"runId":"019a2b22-9e86-770e-8af9-f9136ec6594c"},"job":\{"namespace":"spark_namespace","name":"foreachbatchsink"},"root":\{"run":{"runId":"019a2b22-9e86-770e-8af9-f9136ec6594c"},"job":\{"namespace":"airflow_namespace","name":"airflow_dag.airflow_task"}}},"spark_properties":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","properties":{"spark.master":"local[*]","spark.app.name":"foreachbatchsink"}},"processing_engine":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet","version":"4.0.1","name":"spark","openlineageAdapterVersion":"1.39.0"},"environment-properties":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","environment-properties":{}}}},"job":\{"namespace":"spark_namespace","name":"foreachbatchsink.execute_insert_into_hadoop_fs_relation_command.tests_output_foreachbatchsink","facets":{"jobType":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet","processingType":"BATCH","integration":"SPARK","jobType":"SQL_JOB"}}},"inputs":[],"outputs":[\{"namespace":"file","name":"/Users/xxxx/venvs/tests/output_foreachbatchsink","facets":{"dataSource":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet","name":"file","uri":"file"},"schema":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet","fields":[{"name":"id","type":"long"},\{"name":"name","type":"string"},\{"name":"timestamp","type":"timestamp"},\{"name":"value","type":"double"},\{"name":"year","type":"integer"},\{"name":"month","type":"integer"},\{"name":"day","type":"integer"}]}},"outputFacets":\{"outputStatistics":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-2/OutputStatisticsOutputDatasetFacet.json#/$defs/OutputStatisticsOutputDatasetFacet","rowCount":10,"size":13259,"fileCount":10}}}]}"
```
### What you think should happen instead
openlineage spark lineage should capture the output information for streaming
processing type successfully as the information is in the spark query logic
plan.
Here is the logic plan for streaming query using file sink.
```
== Analyzed Logical Plan ==
id: bigint, name: string, timestamp: timestamp, value: double, year: int,
month: int, day: int
~WriteToMicroBatchDataSourceV1
FileSink[file:/Users/xxx/venvs/tests/output_filesink],
753bdc9a-07cd-4788-a17d-27ff622ababc, [checkpointLocation=checkpoint_filesink,
path=output_filesink, queryName=filesink], Append, 1
+- ~Project [id#2L, name#3, timestamp#0, value#4, year#5, month#6,
dayofmonth(cast(timestamp#0 as date)) AS day#7]
+- ~Project [id#2L, name#3, timestamp#0, value#4, year#5,
month(cast(timestamp#0 as date)) AS month#6]
+- ~Project [id#2L, name#3, timestamp#0, value#4, year(cast(timestamp#0
as date)) AS year#5]
+- ~Project [(value#1L % cast(1000 as bigint)) AS id#2L, concat(user_,
cast((value#1L % cast(100 as bigint)) as string)) AS name#3, timestamp#0,
(rand(-1344458628259366487) * cast(100 as double)) AS value#4]
+- ~StreamingDataSourceV2ScanRelation[timestamp#0, value#1L]
RateStream(rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=12)
```
### How to reproduce
- step 1: install pyspark on your local machine
https://spark.apache.org/docs/latest/api/python/getting_started/install.html
- step 2: install openlineage server on your local machine
https://openlineage.io/getting-started
- step 3: refer to following spark-submit command to run
[file_sink.py](https://github.com/user-attachments/files/23189858/file_sink.py)
and
[foreachbatch_sink.py](https://github.com/user-attachments/files/23189859/foreachbatch_sink.py).
You will see the open lineage event in the debug logs.
```
spark-submit --packages io.openlineage:openlineage-spark_2.13:1.39.0 --conf
"spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener"
--conf "spark.openlineage.transport.type=http" --conf
"spark.openlineage.transport.url=http://localhost:5000" --conf
"spark.openlineage.namespace=spark_namespace" --conf
"spark.openlineage.parentJobNamespace=airflow_namespace" --conf
"spark.openlineage.parentJobName=airflow_dag.airflow_task" --conf
"spark.openlineage.parentRunId=xxxx-xxxx-xxxx-xxxx" [filename].py
```
> Spark Structured Streaming Filesink can not generate open lineage with output
> details
> -------------------------------------------------------------------------------------
>
> Key: SPARK-54071
> URL: https://issues.apache.org/jira/browse/SPARK-54071
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 4.0.1
> Environment: ### OpenLineage version
> `io.openlineage:openlineage-spark_2.13:1.39.0`
> ### Technology and package versions
> Python: 3.13.3
> Scala: 2.13.16
> Java: OpenJDK 64-Bit Server VM, 17.0.16
> `pip freeze`
> py4j==0.10.9.9
> pyspark==4.0.1
>
> ### spark-submit command
> ```
> spark-submit --packages io.openlineage:openlineage-spark_2.13:1.39.0
> --conf
> "spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener"
> --conf "spark.openlineage.transport.type=http" --conf
> "spark.openlineage.transport.url=http://localhost:5000" --conf
> "spark.openlineage.namespace=spark_namespace" --conf
> "spark.openlineage.parentJobNamespace=airflow_namespace" --conf
> "spark.openlineage.parentJobName=airflow_dag.airflow_task" --conf
> "spark.openlineage.parentRunId=xxxx-xxxx-xxxx-xxxx" [filename].py
> ```
> Reporter: Yang Guo
> Priority: Major
>
> # OpenLineage Spark Integration: File Sink Missing Output Information in
> Spark Structured Streaming Mode
> ## OpenLineage version
> `io.openlineage:openlineage-spark_2.13:1.39.0`
> ## Technology and package versions
> **Python:** 3.13.3
> **Scala:** 2.13.16
> **Java:** OpenJDK 64-Bit Server VM, 17.0.16
> **pip freeze:**
> ```
> py4j==0.10.9.9
> pyspark==4.0.1
> ```
> ## Environment configuration
> I used the default config for openlineage and submit native spark on local
> machine. There is no managed services involved.
> ```bash
> $ git clone https://github.com/MarquezProject/marquez.git && cd marquez
> $ ./docker/up.sh
> ```
> ## Problem details
> When using Spark structured streaming to write parquet file to file systems:
> - **File sink** will only generate openlineage event with streaming
> processing type with output information as **empty**.
> - **Foreachbatch sink** will generate openlineage event with both streaming
> processing type and batch processing type. The batch processing type will
> have **valid output information**.
> **The bug:** The openlineage spark integration cannot generate openlineage
> event with valid output details for File sink in Spark structured streaming.
> ## File sink
> ### Sample code:
> ```python
> query = streaming_df.writeStream \
> .format('parquet') \
> .outputMode('append') \
> .partitionBy('year', 'month', 'day') \
> .option('checkpointLocation', checkpoint_path) \
> .option('path', output_path) \
> .queryName('filesink') \
> .start()
> ```
> ### Sample event for "processingType":"STREAMING"
> ```json
> {
> "eventTime": "2025-10-28T13:45:34.282Z",
> "producer":
> "https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark",
> "schemaURL":
> "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
> "eventType": "COMPLETE",
> "run": {
> "runId": "019a2b14-4e9d-7574-95a9-55182f07591d",
> "facets": {
> "parent": {
> "_producer":
> "https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark",
> "_schemaURL":
> "https://openlineage.io/spec/facets/1-1-0/ParentRunFacet.json#/$defs/ParentRunFacet",
> "run": {"runId": "019a2b10-5218-7ba3-9cf4-1ce4b1800752"},
> "job": {"namespace": "spark_namespace", "name": "filesink"},
> "root": {
> "run": {"runId": "019a2b10-5218-7ba3-9cf4-1ce4b1800752"},
> "job": {"namespace": "airflow_namespace", "name":
> "airflow_dag.airflow_task"}
> }
> }
> }
> },
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]