[ 
https://issues.apache.org/jira/browse/SPARK-54071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Guo updated SPARK-54071:
-----------------------------
    Attachment: foreachbatch_sink.py

> Spark Structured Streaming Filesink can not generate open lineage with output 
> details
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-54071
>                 URL: https://issues.apache.org/jira/browse/SPARK-54071
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 4.0.1
>         Environment: ### OpenLineage version
> `io.openlineage:openlineage-spark_2.13:1.39.0`
> ### Technology and package versions
> Python:  3.13.3
> Scala: 2.13.16
> Java: OpenJDK 64-Bit Server VM, 17.0.16
> `pip freeze`
> py4j==0.10.9.9
> pyspark==4.0.1
>  
> ### spark-submit command
> ```
> spark-submit   --packages io.openlineage:openlineage-spark_2.13:1.39.0   
> --conf 
> "spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener"   
> --conf "spark.openlineage.transport.type=http"   --conf 
> "spark.openlineage.transport.url=http://localhost:5000";   --conf 
> "spark.openlineage.namespace=spark_namespace"   --conf 
> "spark.openlineage.parentJobNamespace=airflow_namespace"   --conf 
> "spark.openlineage.parentJobName=airflow_dag.airflow_task"   --conf 
> "spark.openlineage.parentRunId=xxxx-xxxx-xxxx-xxxx"   [filename].py
> ```
>            Reporter: Yang Guo
>            Priority: Major
>         Attachments: file_sink.py, foreachbatch_sink.py
>
>
> # OpenLineage Spark Integration: File Sink Missing Output Information in 
> Spark Structured Streaming Mode
> ## OpenLineage version
> `io.openlineage:openlineage-spark_2.13:1.39.0`
> ## Technology and package versions
> **Python:** 3.13.3  
> **Scala:** 2.13.16  
> **Java:** OpenJDK 64-Bit Server VM, 17.0.16
> **pip freeze:**
> ```
> py4j==0.10.9.9
> pyspark==4.0.1
> ```
> ## Environment configuration
> I used the default config for openlineage and submit native spark on local 
> machine. There is no managed services involved.
> ```bash
> $ git clone https://github.com/MarquezProject/marquez.git && cd marquez
> $ ./docker/up.sh
> ```
> ## Problem details
> When using Spark structured streaming to write parquet file to file systems:
> - **File sink** will only generate openlineage event with streaming 
> processing type with output information as **empty**.
> - **Foreachbatch sink** will generate openlineage event with both streaming 
> processing type and batch processing type. The batch processing type will 
> have **valid output information**.
> **The bug:** The openlineage spark integration cannot generate openlineage 
> event with valid output details for File sink in Spark structured streaming.
> ## File sink
> ### Sample code:
> ```python
> query = streaming_df.writeStream \
>     .format('parquet') \
>     .outputMode('append') \
>     .partitionBy('year', 'month', 'day') \
>     .option('checkpointLocation', checkpoint_path) \
>     .option('path', output_path) \
>     .queryName('filesink') \
>     .start()
> ```
> ### Sample event for "processingType":"STREAMING"
> ```json
> {
>   "eventTime": "2025-10-28T13:45:34.282Z",
>   "producer": 
> "https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark";,
>   "schemaURL": 
> "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent";,
>   "eventType": "COMPLETE",
>   "run": {
>     "runId": "019a2b14-4e9d-7574-95a9-55182f07591d",
>     "facets": {
>       "parent": {
>         "_producer": 
> "https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark";,
>         "_schemaURL": 
> "https://openlineage.io/spec/facets/1-1-0/ParentRunFacet.json#/$defs/ParentRunFacet";,
>         "run": {"runId": "019a2b10-5218-7ba3-9cf4-1ce4b1800752"},
>         "job": {"namespace": "spark_namespace", "name": "filesink"},
>         "root": {
>           "run": {"runId": "019a2b10-5218-7ba3-9cf4-1ce4b1800752"},
>           "job": {"namespace": "airflow_namespace", "name": 
> "airflow_dag.airflow_task"}
>         }
>       }
>     }
>   },



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to