Yang Guo created SPARK-54071:
--------------------------------

             Summary: Spark Structured Streaming Filesink can not generate open 
lineage with output details
                 Key: SPARK-54071
                 URL: https://issues.apache.org/jira/browse/SPARK-54071
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 4.0.1
         Environment: ### OpenLineage version

`io.openlineage:openlineage-spark_2.13:1.39.0`

### Technology and package versions

Python:  3.13.3
Scala: 2.13.16
Java: OpenJDK 64-Bit Server VM, 17.0.16

`pip freeze`
py4j==0.10.9.9
pyspark==4.0.1

 

### spark-submit command

```

spark-submit   --packages io.openlineage:openlineage-spark_2.13:1.39.0   --conf 
"spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener"   
--conf "spark.openlineage.transport.type=http"   --conf 
"spark.openlineage.transport.url=http://localhost:5000";   --conf 
"spark.openlineage.namespace=spark_namespace"   --conf 
"spark.openlineage.parentJobNamespace=airflow_namespace"   --conf 
"spark.openlineage.parentJobName=airflow_dag.airflow_task"   --conf 
"spark.openlineage.parentRunId=xxxx-xxxx-xxxx-xxxx"   [filename].py

```
            Reporter: Yang Guo


### OpenLineage version

`io.openlineage:openlineage-spark_2.13:1.39.0`

### Technology and package versions

Python:  3.13.3
Scala: 2.13.16
Java: OpenJDK 64-Bit Server VM, 17.0.16

`pip freeze`
py4j==0.10.9.9
pyspark==4.0.1

### Environment configuration

I used the default config for openlineage and submit native spark on local 
machine. There is no managed services involved.
```
$ git clone https://github.com/MarquezProject/marquez.git && cd marquez

$ ./docker/up.sh
```

### Problem details

When using Spark structured streaming to write parquet file to file systems,
- File sink will only generate openlineage event with streaming processing type 
with output information as empty.
- Foreachbatch sink will generate openlineage event with both streaming 
processing type and batch processing type. The batch processing type will have 
valid output information.

The bug is that the openlineage spark integration can not generate openlineage 
event with valid output details for File sink in Spark structured streaming.

More details about the sample code and sample events are following.
### File sink:

#### Sample code:
```
query = streaming_df.writeStream \
    .format('parquet') \
    .outputMode('append') \
    .partitionBy('year', 'month', 'day') \
    .option('checkpointLocation', checkpoint_path) \
    .option('path', output_path) \
    .queryName('filesink') \
    .start()
```

#### Sample event for "processingType":"STREAMING"
```
25/10/29 00:49:02 DEBUG wire: http-outgoing-52 >> 
"\{"eventTime":"2025-10-28T13:45:34.282Z","producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent","eventType":"COMPLETE","run":{"runId":"019a2b14-4e9d-7574-95a9-55182f07591d","facets":{"parent":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-0/ParentRunFacet.json#/$defs/ParentRunFacet","run":{"runId":"019a2b10-5218-7ba3-9cf4-1ce4b1800752"},"job":\{"namespace":"spark_namespace","name":"filesink"},"root":\{"run":{"runId":"019a2b10-5218-7ba3-9cf4-1ce4b1800752"},"job":\{"namespace":"airflow_namespace","name":"airflow_dag.airflow_task"}}},"spark_properties":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","properties":{"spark.master":"local[*]","spark.app.name":"filesink"}},"processing_engine":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet","version":"4.0.1","name":"spark","openlineageAdapterVersion":"1.39.0"},"environment-properties":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","environment-properties":{}}}},"job":\{"namespace":"spark_namespace","name":"filesink.project","facets":{"jobType":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet","processingType":"STREAMING","integration":"SPARK","jobType":"SQL_JOB"}}},"inputs":[],"outputs":[]}";
```
### foreachbatch sink
#### sample code
```
def write_to_file(batch_df, batch_id):
    if batch_df.count() > 0:
        batch_df.write \
            .mode("append") \
            .partitionBy("year", "month", "day") \
            .parquet(output_path)

query = streaming_df \
    .writeStream \
    .outputMode("append") \
    .foreachBatch(write_to_file) \
    .option("checkpointLocation", checkpoint_path) \
    .trigger(processingTime='10 seconds') \
    .start()
```

The above code with generate both streaming and batch processing type event.

#### Sample streaming type event
```
25/10/29 01:04:45 DEBUG wire: http-outgoing-1 >> 
"\{"eventTime":"2025-10-28T14:04:43.373Z","producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent","eventType":"COMPLETE","run":{"runId":"019a2b22-b364-79ca-be87-2d173c25c16c","facets":{"parent":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-0/ParentRunFacet.json#/$defs/ParentRunFacet","run":{"runId":"019a2b22-9e86-770e-8af9-f9136ec6594c"},"job":\{"namespace":"spark_namespace","name":"foreachbatchsink"},"root":\{"run":{"runId":"019a2b22-9e86-770e-8af9-f9136ec6594c"},"job":\{"namespace":"airflow_namespace","name":"airflow_dag.airflow_task"}}},"spark_properties":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","properties":{"spark.master":"local[*]","spark.app.name":"foreachbatchsink"}},"processing_engine":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet","version":"4.0.1","name":"spark","openlineageAdapterVersion":"1.39.0"},"environment-properties":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","environment-properties":{}}}},"job":\{"namespace":"spark_namespace","name":"foreachbatchsink.project","facets":{"jobType":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet","processingType":"STREAMING","integration":"SPARK","jobType":"SQL_JOB"}}},"inputs":[],"outputs":[]}";
```
#### Sample batch type event
```
25/10/29 01:07:26 DEBUG wire: http-outgoing-33 >> 
"\{"eventTime":"2025-10-28T14:07:20.711Z","producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent","eventType":"COMPLETE","run":{"runId":"019a2b25-28f6-7fa0-a4e2-2aaba4f61d7e","facets":{"parent":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-0/ParentRunFacet.json#/$defs/ParentRunFacet","run":{"runId":"019a2b22-9e86-770e-8af9-f9136ec6594c"},"job":\{"namespace":"spark_namespace","name":"foreachbatchsink"},"root":\{"run":{"runId":"019a2b22-9e86-770e-8af9-f9136ec6594c"},"job":\{"namespace":"airflow_namespace","name":"airflow_dag.airflow_task"}}},"spark_properties":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","properties":{"spark.master":"local[*]","spark.app.name":"foreachbatchsink"}},"processing_engine":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet","version":"4.0.1","name":"spark","openlineageAdapterVersion":"1.39.0"},"environment-properties":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","environment-properties":{}}}},"job":\{"namespace":"spark_namespace","name":"foreachbatchsink.execute_insert_into_hadoop_fs_relation_command.tests_output_foreachbatchsink","facets":{"jobType":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet","processingType":"BATCH","integration":"SPARK","jobType":"SQL_JOB"}}},"inputs":[],"outputs":[\{"namespace":"file","name":"/Users/xxxx/venvs/tests/output_foreachbatchsink","facets":{"dataSource":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet","name":"file","uri":"file"},"schema":\{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet","fields":[{"name":"id","type":"long"},\{"name":"name","type":"string"},\{"name":"timestamp","type":"timestamp"},\{"name":"value","type":"double"},\{"name":"year","type":"integer"},\{"name":"month","type":"integer"},\{"name":"day","type":"integer"}]}},"outputFacets":\{"outputStatistics":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-2/OutputStatisticsOutputDatasetFacet.json#/$defs/OutputStatisticsOutputDatasetFacet","rowCount":10,"size":13259,"fileCount":10}}}]}";
```

 

### What you think should happen instead

openlineage spark lineage should capture the output information for streaming 
processing type successfully as the information is in the spark query logic 
plan.

Here is the logic plan for streaming query using file sink.
```
== Analyzed Logical Plan ==
id: bigint, name: string, timestamp: timestamp, value: double, year: int, 
month: int, day: int
~WriteToMicroBatchDataSourceV1 
FileSink[file:/Users/xxx/venvs/tests/output_filesink], 
753bdc9a-07cd-4788-a17d-27ff622ababc, [checkpointLocation=checkpoint_filesink, 
path=output_filesink, queryName=filesink], Append, 1
+- ~Project [id#2L, name#3, timestamp#0, value#4, year#5, month#6, 
dayofmonth(cast(timestamp#0 as date)) AS day#7]
   +- ~Project [id#2L, name#3, timestamp#0, value#4, year#5, 
month(cast(timestamp#0 as date)) AS month#6]
      +- ~Project [id#2L, name#3, timestamp#0, value#4, year(cast(timestamp#0 
as date)) AS year#5]
         +- ~Project [(value#1L % cast(1000 as bigint)) AS id#2L, concat(user_, 
cast((value#1L % cast(100 as bigint)) as string)) AS name#3, timestamp#0, 
(rand(-1344458628259366487) * cast(100 as double)) AS value#4]
            +- ~StreamingDataSourceV2ScanRelation[timestamp#0, value#1L] 
RateStream(rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=12)
```

### How to reproduce

- step 1: install pyspark on your local machine 
https://spark.apache.org/docs/latest/api/python/getting_started/install.html
- step 2: install openlineage server on your local machine 
https://openlineage.io/getting-started
- step 3: refer to following spark-submit command to run 
[file_sink.py](https://github.com/user-attachments/files/23189858/file_sink.py) 
and 
[foreachbatch_sink.py](https://github.com/user-attachments/files/23189859/foreachbatch_sink.py).
 You will see the open lineage event in the debug logs.
```
spark-submit   --packages io.openlineage:openlineage-spark_2.13:1.39.0   --conf 
"spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener"   
--conf "spark.openlineage.transport.type=http"   --conf 
"spark.openlineage.transport.url=http://localhost:5000";   --conf 
"spark.openlineage.namespace=spark_namespace"   --conf 
"spark.openlineage.parentJobNamespace=airflow_namespace"   --conf 
"spark.openlineage.parentJobName=airflow_dag.airflow_task"   --conf 
"spark.openlineage.parentRunId=xxxx-xxxx-xxxx-xxxx"   [filename].py
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to