Yang Guo created SPARK-54071:
--------------------------------
Summary: Spark Structured Streaming Filesink can not generate open
lineage with output details
Key: SPARK-54071
URL: https://issues.apache.org/jira/browse/SPARK-54071
Project: Spark
Issue Type: Bug
Components: Structured Streaming
Affects Versions: 4.0.1
Environment: ### OpenLineage version
`io.openlineage:openlineage-spark_2.13:1.39.0`
### Technology and package versions
Python: 3.13.3
Scala: 2.13.16
Java: OpenJDK 64-Bit Server VM, 17.0.16
`pip freeze`
py4j==0.10.9.9
pyspark==4.0.1
### spark-submit command
```
spark-submit --packages io.openlineage:openlineage-spark_2.13:1.39.0 --conf
"spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener"
--conf "spark.openlineage.transport.type=http" --conf
"spark.openlineage.transport.url=http://localhost:5000" --conf
"spark.openlineage.namespace=spark_namespace" --conf
"spark.openlineage.parentJobNamespace=airflow_namespace" --conf
"spark.openlineage.parentJobName=airflow_dag.airflow_task" --conf
"spark.openlineage.parentRunId=xxxx-xxxx-xxxx-xxxx" [filename].py
```
Reporter: Yang Guo
### OpenLineage version
`io.openlineage:openlineage-spark_2.13:1.39.0`
### Technology and package versions
Python: 3.13.3
Scala: 2.13.16
Java: OpenJDK 64-Bit Server VM, 17.0.16
`pip freeze`
py4j==0.10.9.9
pyspark==4.0.1
### Environment configuration
I used the default config for openlineage and submit native spark on local
machine. There is no managed services involved.
```
$ git clone https://github.com/MarquezProject/marquez.git && cd marquez
$ ./docker/up.sh
```
### Problem details
When using Spark structured streaming to write parquet file to file systems,
- File sink will only generate openlineage event with streaming processing type
with output information as empty.
- Foreachbatch sink will generate openlineage event with both streaming
processing type and batch processing type. The batch processing type will have
valid output information.
The bug is that the openlineage spark integration can not generate openlineage
event with valid output details for File sink in Spark structured streaming.
More details about the sample code and sample events are following.
### File sink:
#### Sample code:
```
query = streaming_df.writeStream \
.format('parquet') \
.outputMode('append') \
.partitionBy('year', 'month', 'day') \
.option('checkpointLocation', checkpoint_path) \
.option('path', output_path) \
.queryName('filesink') \
.start()
```
#### Sample event for "processingType":"STREAMING"
```
25/10/29 00:49:02 DEBUG wire: http-outgoing-52 >>
"\{"eventTime":"2025-10-28T13:45:34.282Z","producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent","eventType":"COMPLETE","run":{"runId":"019a2b14-4e9d-7574-95a9-55182f07591d","facets":{"parent":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-0/ParentRunFacet.json#/$defs/ParentRunFacet","run":{"runId":"019a2b10-5218-7ba3-9cf4-1ce4b1800752"},"job":\{"namespace":"spark_namespace","name":"filesink"},"root":\{"run":{"runId":"019a2b10-5218-7ba3-9cf4-1ce4b1800752"},"job":\{"namespace":"airflow_namespace","name":"airflow_dag.airflow_task"}}},"spark_properties":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","properties":{"spark.master":"local[*]","spark.app.name":"filesink"}},"processing_engine":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet","version":"4.0.1","name":"spark","openlineageAdapterVersion":"1.39.0"},"environment-properties":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","environment-properties":{}}}},"job":\{"namespace":"spark_namespace","name":"filesink.project","facets":{"jobType":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet","processingType":"STREAMING","integration":"SPARK","jobType":"SQL_JOB"}}},"inputs":[],"outputs":[]}"
```
### foreachbatch sink
#### sample code
```
def write_to_file(batch_df, batch_id):
if batch_df.count() > 0:
batch_df.write \
.mode("append") \
.partitionBy("year", "month", "day") \
.parquet(output_path)
query = streaming_df \
.writeStream \
.outputMode("append") \
.foreachBatch(write_to_file) \
.option("checkpointLocation", checkpoint_path) \
.trigger(processingTime='10 seconds') \
.start()
```
The above code with generate both streaming and batch processing type event.
#### Sample streaming type event
```
25/10/29 01:04:45 DEBUG wire: http-outgoing-1 >>
"\{"eventTime":"2025-10-28T14:04:43.373Z","producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent","eventType":"COMPLETE","run":{"runId":"019a2b22-b364-79ca-be87-2d173c25c16c","facets":{"parent":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-0/ParentRunFacet.json#/$defs/ParentRunFacet","run":{"runId":"019a2b22-9e86-770e-8af9-f9136ec6594c"},"job":\{"namespace":"spark_namespace","name":"foreachbatchsink"},"root":\{"run":{"runId":"019a2b22-9e86-770e-8af9-f9136ec6594c"},"job":\{"namespace":"airflow_namespace","name":"airflow_dag.airflow_task"}}},"spark_properties":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","properties":{"spark.master":"local[*]","spark.app.name":"foreachbatchsink"}},"processing_engine":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet","version":"4.0.1","name":"spark","openlineageAdapterVersion":"1.39.0"},"environment-properties":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","environment-properties":{}}}},"job":\{"namespace":"spark_namespace","name":"foreachbatchsink.project","facets":{"jobType":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet","processingType":"STREAMING","integration":"SPARK","jobType":"SQL_JOB"}}},"inputs":[],"outputs":[]}"
```
#### Sample batch type event
```
25/10/29 01:07:26 DEBUG wire: http-outgoing-33 >>
"\{"eventTime":"2025-10-28T14:07:20.711Z","producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent","eventType":"COMPLETE","run":{"runId":"019a2b25-28f6-7fa0-a4e2-2aaba4f61d7e","facets":{"parent":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-0/ParentRunFacet.json#/$defs/ParentRunFacet","run":{"runId":"019a2b22-9e86-770e-8af9-f9136ec6594c"},"job":\{"namespace":"spark_namespace","name":"foreachbatchsink"},"root":\{"run":{"runId":"019a2b22-9e86-770e-8af9-f9136ec6594c"},"job":\{"namespace":"airflow_namespace","name":"airflow_dag.airflow_task"}}},"spark_properties":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","properties":{"spark.master":"local[*]","spark.app.name":"foreachbatchsink"}},"processing_engine":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet","version":"4.0.1","name":"spark","openlineageAdapterVersion":"1.39.0"},"environment-properties":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","environment-properties":{}}}},"job":\{"namespace":"spark_namespace","name":"foreachbatchsink.execute_insert_into_hadoop_fs_relation_command.tests_output_foreachbatchsink","facets":{"jobType":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet","processingType":"BATCH","integration":"SPARK","jobType":"SQL_JOB"}}},"inputs":[],"outputs":[\{"namespace":"file","name":"/Users/xxxx/venvs/tests/output_foreachbatchsink","facets":{"dataSource":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet","name":"file","uri":"file"},"schema":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet","fields":[{"name":"id","type":"long"},\{"name":"name","type":"string"},\{"name":"timestamp","type":"timestamp"},\{"name":"value","type":"double"},\{"name":"year","type":"integer"},\{"name":"month","type":"integer"},\{"name":"day","type":"integer"}]}},"outputFacets":\{"outputStatistics":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-2/OutputStatisticsOutputDatasetFacet.json#/$defs/OutputStatisticsOutputDatasetFacet","rowCount":10,"size":13259,"fileCount":10}}}]}"
```
### What you think should happen instead
openlineage spark lineage should capture the output information for streaming
processing type successfully as the information is in the spark query logic
plan.
Here is the logic plan for streaming query using file sink.
```
== Analyzed Logical Plan ==
id: bigint, name: string, timestamp: timestamp, value: double, year: int,
month: int, day: int
~WriteToMicroBatchDataSourceV1
FileSink[file:/Users/xxx/venvs/tests/output_filesink],
753bdc9a-07cd-4788-a17d-27ff622ababc, [checkpointLocation=checkpoint_filesink,
path=output_filesink, queryName=filesink], Append, 1
+- ~Project [id#2L, name#3, timestamp#0, value#4, year#5, month#6,
dayofmonth(cast(timestamp#0 as date)) AS day#7]
+- ~Project [id#2L, name#3, timestamp#0, value#4, year#5,
month(cast(timestamp#0 as date)) AS month#6]
+- ~Project [id#2L, name#3, timestamp#0, value#4, year(cast(timestamp#0
as date)) AS year#5]
+- ~Project [(value#1L % cast(1000 as bigint)) AS id#2L, concat(user_,
cast((value#1L % cast(100 as bigint)) as string)) AS name#3, timestamp#0,
(rand(-1344458628259366487) * cast(100 as double)) AS value#4]
+- ~StreamingDataSourceV2ScanRelation[timestamp#0, value#1L]
RateStream(rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=12)
```
### How to reproduce
- step 1: install pyspark on your local machine
https://spark.apache.org/docs/latest/api/python/getting_started/install.html
- step 2: install openlineage server on your local machine
https://openlineage.io/getting-started
- step 3: refer to following spark-submit command to run
[file_sink.py](https://github.com/user-attachments/files/23189858/file_sink.py)
and
[foreachbatch_sink.py](https://github.com/user-attachments/files/23189859/foreachbatch_sink.py).
You will see the open lineage event in the debug logs.
```
spark-submit --packages io.openlineage:openlineage-spark_2.13:1.39.0 --conf
"spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener"
--conf "spark.openlineage.transport.type=http" --conf
"spark.openlineage.transport.url=http://localhost:5000" --conf
"spark.openlineage.namespace=spark_namespace" --conf
"spark.openlineage.parentJobNamespace=airflow_namespace" --conf
"spark.openlineage.parentJobName=airflow_dag.airflow_task" --conf
"spark.openlineage.parentRunId=xxxx-xxxx-xxxx-xxxx" [filename].py
```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]