[
https://issues.apache.org/jira/browse/SPARK-54071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yang Guo updated SPARK-54071:
-----------------------------
Description:
h2. Environment details
h3. OpenLineage version
{quote}io.openlineage:openlineage-spark_2.13:1.39.0
Technology and package versions
Python: 3.13.3
Scala: 2.13.16
Java: OpenJDK 64-Bit Server VM, 17.0.16
pip freeze
py4j==0.10.9.9
pyspark==4.0.1{quote}
For the openlineage set up, I used the default setting:
{quote}$ git clone https://github.com/MarquezProject/marquez.git && cd marquez
$ ./docker/up.sh{quote}
h3. Spark Deployment details
I used native spark on local machine. There is no managed services involved.
Problem details
h2. Issue details
When using Spark structured streaming to write parquet file to file systems,
* File sink will only generate openlineage event with streaming processing
type with output information as empty.
* Foreachbatch sink will generate openlineage event with both streaming
processing type and batch processing type. The batch processing type will have
valid output information.
The bug is that File sink in Spark structured streaming does not generate open
lineage event with output details.
More details about the sample code and sample events are following.
File sink:
Sample code:
{quote}
query = streaming_df.writeStream \
.format('parquet') \
.outputMode('append') \
.partitionBy('year', 'month', 'day') \
.option('checkpointLocation', checkpoint_path) \
.option('path', output_path) \
.queryName('filesink') \
.start()
{quote}
Sample event for "processingType":"STREAMING"
{quote}
25/10/29 00:49:02 DEBUG wire: http-outgoing-52 >>
"{"eventTime":"2025-10-28T13:45:34.282Z","producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent","eventType":"COMPLETE","run":{"runId":"019a2b14-4e9d-7574-95a9-55182f07591d","facets":{"parent":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-0/ParentRunFacet.json#/$defs/ParentRunFacet","run":{"runId":"019a2b10-5218-7ba3-9cf4-1ce4b1800752"},"job":{"namespace":"spark_namespace","name":"filesink"},"root":{"run":{"runId":"019a2b10-5218-7ba3-9cf4-1ce4b1800752"},"job":{"namespace":"airflow_namespace","name":"airflow_dag.airflow_task"}}},"spark_properties":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","properties":{"spark.master":"local[*]","spark.app.name":"filesink"}},"processing_engine":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet","version":"4.0.1","name":"spark","openlineageAdapterVersion":"1.39.0"},"environment-properties":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","environment-properties":{}}}},"job":{"namespace":"spark_namespace","name":"filesink.project","facets":{"jobType":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet","processingType":"STREAMING","integration":"SPARK","jobType":"SQL_JOB"}}},"inputs":[],"outputs":[]}"{quote}
foreachbatch sink
sample code
{quote}
def write_to_file(batch_df, batch_id):
if batch_df.count() > 0:
batch_df.write \
.mode("append") \
.partitionBy("year", "month", "day") \
.parquet(output_path)
{quote}
{quote}
query = streaming_df \
.writeStream \
.outputMode("append") \
.foreachBatch(write_to_file) \
.option("checkpointLocation", checkpoint_path) \
.trigger(processingTime='10 seconds') \
.start()
{quote}
The above code with generate both streaming and batch processing type event.
Sample streaming type event
{quote}
25/10/29 01:04:45 DEBUG wire: http-outgoing-1 >>
"{"eventTime":"2025-10-28T14:04:43.373Z","producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent","eventType":"COMPLETE","run":{"runId":"019a2b22-b364-79ca-be87-2d173c25c16c","facets":{"parent":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-0/ParentRunFacet.json#/$defs/ParentRunFacet","run":{"runId":"019a2b22-9e86-770e-8af9-f9136ec6594c"},"job":{"namespace":"spark_namespace","name":"foreachbatchsink"},"root":{"run":{"runId":"019a2b22-9e86-770e-8af9-f9136ec6594c"},"job":{"namespace":"airflow_namespace","name":"airflow_dag.airflow_task"}}},"spark_properties":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","properties":{"spark.master":"local[*]","spark.app.name":"foreachbatchsink"}},"processing_engine":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet","version":"4.0.1","name":"spark","openlineageAdapterVersion":"1.39.0"},"environment-properties":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","environment-properties":{}}}},"job":{"namespace":"spark_namespace","name":"foreachbatchsink.project","facets":{"jobType":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet","processingType":"STREAMING","integration":"SPARK","jobType":"SQL_JOB"}}},"inputs":[],"outputs":[]}"
{quote}
Sample batch type event
{quote}
25/10/29 01:07:26 DEBUG wire: http-outgoing-33 >>
"{"eventTime":"2025-10-28T14:07:20.711Z","producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent","eventType":"COMPLETE","run":{"runId":"019a2b25-28f6-7fa0-a4e2-2aaba4f61d7e","facets":{"parent":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-0/ParentRunFacet.json#/$defs/ParentRunFacet","run":{"runId":"019a2b22-9e86-770e-8af9-f9136ec6594c"},"job":{"namespace":"spark_namespace","name":"foreachbatchsink"},"root":{"run":{"runId":"019a2b22-9e86-770e-8af9-f9136ec6594c"},"job":{"namespace":"airflow_namespace","name":"airflow_dag.airflow_task"}}},"spark_properties":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","properties":{"spark.master":"local[*]","spark.app.name":"foreachbatchsink"}},"processing_engine":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet","version":"4.0.1","name":"spark","openlineageAdapterVersion":"1.39.0"},"environment-properties":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","environment-properties":{}}}},"job":{"namespace":"spark_namespace","name":"foreachbatchsink.execute_insert_into_hadoop_fs_relation_command.tests_output_foreachbatchsink","facets":{"jobType":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet","processingType":"BATCH","integration":"SPARK","jobType":"SQL_JOB"}}},"inputs":[],"outputs":[{"namespace":"file","name":"/Users/xxxx/venvs/tests/output_foreachbatchsink","facets":{"dataSource":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet","name":"file","uri":"file"},"schema":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet","fields":[{"name":"id","type":"long"},{"name":"name","type":"string"},{"name":"timestamp","type":"timestamp"},{"name":"value","type":"double"},{"name":"year","type":"integer"},{"name":"month","type":"integer"},{"name":"day","type":"integer"}]}},"outputFacets":{"outputStatistics":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-2/OutputStatisticsOutputDatasetFacet.json#/$defs/OutputStatisticsOutputDatasetFacet","rowCount":10,"size":13259,"fileCount":10}}}]}"{quote}
What you think should happen instead
File sink for spark structured streaming should create open lineage event with
valid output details as the information is in the spark query logic plan.
Here is the logic plan for streaming query using file sink.
{quote}
== Analyzed Logical Plan ==
id: bigint, name: string, timestamp: timestamp, value: double, year: int,
month: int, day: int
~WriteToMicroBatchDataSourceV1
FileSink[file:/Users/xxx/venvs/tests/output_filesink],
753bdc9a-07cd-4788-a17d-27ff622ababc, [checkpointLocation=checkpoint_filesink,
path=output_filesink, queryName=filesink], Append, 1
+- ~Project [id#2L, name#3, timestamp#0, value#4, year#5, month#6,
dayofmonth(cast(timestamp#0 as date)) AS day#7]
+- ~Project [id#2L, name#3, timestamp#0, value#4, year#5,
month(cast(timestamp#0 as date)) AS month#6]
+- ~Project [id#2L, name#3, timestamp#0, value#4, year(cast(timestamp#0
as date)) AS year#5]
+- ~Project [(value#1L % cast(1000 as bigint)) AS id#2L, concat(user_,
cast((value#1L % cast(100 as bigint)) as string)) AS name#3, timestamp#0,
(rand(-1344458628259366487) * cast(100 as double)) AS value#4]
+- ~StreamingDataSourceV2ScanRelation[timestamp#0, value#1L]
RateStream(rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=12)
{quote}
How to reproduce
* step 1: install pyspark on your local machine
https://spark.apache.org/docs/latest/api/python/getting_started/install.html
* step 2: install openlineage server on your local machine
https://openlineage.io/getting-started
* step 3: refer to following spark-submit command to run file_sink.py and
foreachbatch_sink.py. You will see the open lineage event in the debug logs.
{quote}
spark-submit --packages io.openlineage:openlineage-spark_2.13:1.39.0 --conf
"spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener"
--conf "spark.openlineage.transport.type=http" --conf
"spark.openlineage.transport.url=http://localhost:5000" --conf
"spark.openlineage.namespace=spark_namespace" --conf
"spark.openlineage.parentJobNamespace=airflow_namespace" --conf
"spark.openlineage.parentJobName=airflow_dag.airflow_task" --conf
"spark.openlineage.parentRunId=xxxx-xxxx-xxxx-xxxx" [filename].py
{quote}
was:
# OpenLineage Spark Integration: File Sink Missing Output Information in Spark
Structured Streaming Mode
## OpenLineage version
`io.openlineage:openlineage-spark_2.13:1.39.0`
## Technology and package versions
**Python:** 3.13.3
**Scala:** 2.13.16
**Java:** OpenJDK 64-Bit Server VM, 17.0.16
**pip freeze:**
```
py4j==0.10.9.9
pyspark==4.0.1
```
## Environment configuration
I used the default config for openlineage and submit native spark on local
machine. There is no managed services involved.
```bash
$ git clone https://github.com/MarquezProject/marquez.git && cd marquez
$ ./docker/up.sh
```
## Problem details
When using Spark structured streaming to write parquet file to file systems:
- **File sink** will only generate openlineage event with streaming processing
type with output information as **empty**.
- **Foreachbatch sink** will generate openlineage event with both streaming
processing type and batch processing type. The batch processing type will have
**valid output information**.
**The bug:** The openlineage spark integration cannot generate openlineage
event with valid output details for File sink in Spark structured streaming.
## File sink
### Sample code:
```python
query = streaming_df.writeStream \
.format('parquet') \
.outputMode('append') \
.partitionBy('year', 'month', 'day') \
.option('checkpointLocation', checkpoint_path) \
.option('path', output_path) \
.queryName('filesink') \
.start()
```
### Sample event for "processingType":"STREAMING"
```json
{
"eventTime": "2025-10-28T13:45:34.282Z",
"producer":
"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark",
"schemaURL":
"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
"eventType": "COMPLETE",
"run": {
"runId": "019a2b14-4e9d-7574-95a9-55182f07591d",
"facets": {
"parent": {
"_producer":
"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark",
"_schemaURL":
"https://openlineage.io/spec/facets/1-1-0/ParentRunFacet.json#/$defs/ParentRunFacet",
"run": {"runId": "019a2b10-5218-7ba3-9cf4-1ce4b1800752"},
"job": {"namespace": "spark_namespace", "name": "filesink"},
"root": {
"run": {"runId": "019a2b10-5218-7ba3-9cf4-1ce4b1800752"},
"job": {"namespace": "airflow_namespace", "name":
"airflow_dag.airflow_task"}
}
}
}
},
> Spark Structured Streaming Filesink can not generate open lineage with output
> details
> -------------------------------------------------------------------------------------
>
> Key: SPARK-54071
> URL: https://issues.apache.org/jira/browse/SPARK-54071
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 4.0.1
> Environment: ### OpenLineage version
> `io.openlineage:openlineage-spark_2.13:1.39.0`
> ### Technology and package versions
> Python: 3.13.3
> Scala: 2.13.16
> Java: OpenJDK 64-Bit Server VM, 17.0.16
> `pip freeze`
> py4j==0.10.9.9
> pyspark==4.0.1
>
> ### spark-submit command
> ```
> spark-submit --packages io.openlineage:openlineage-spark_2.13:1.39.0
> --conf
> "spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener"
> --conf "spark.openlineage.transport.type=http" --conf
> "spark.openlineage.transport.url=http://localhost:5000" --conf
> "spark.openlineage.namespace=spark_namespace" --conf
> "spark.openlineage.parentJobNamespace=airflow_namespace" --conf
> "spark.openlineage.parentJobName=airflow_dag.airflow_task" --conf
> "spark.openlineage.parentRunId=xxxx-xxxx-xxxx-xxxx" [filename].py
> ```
> Reporter: Yang Guo
> Priority: Major
> Attachments: file_sink.py, foreachbatch_sink.py
>
>
> h2. Environment details
> h3. OpenLineage version
> {quote}io.openlineage:openlineage-spark_2.13:1.39.0
> Technology and package versions
> Python: 3.13.3
> Scala: 2.13.16
> Java: OpenJDK 64-Bit Server VM, 17.0.16
> pip freeze
> py4j==0.10.9.9
> pyspark==4.0.1{quote}
> For the openlineage set up, I used the default setting:
> {quote}$ git clone https://github.com/MarquezProject/marquez.git && cd marquez
> $ ./docker/up.sh{quote}
> h3. Spark Deployment details
> I used native spark on local machine. There is no managed services involved.
> Problem details
> h2. Issue details
> When using Spark structured streaming to write parquet file to file systems,
> * File sink will only generate openlineage event with streaming
> processing type with output information as empty.
> * Foreachbatch sink will generate openlineage event with both streaming
> processing type and batch processing type. The batch processing type will
> have valid output information.
> The bug is that File sink in Spark structured streaming does not generate
> open lineage event with output details.
> More details about the sample code and sample events are following.
> File sink:
> Sample code:
> {quote}
> query = streaming_df.writeStream \
> .format('parquet') \
> .outputMode('append') \
> .partitionBy('year', 'month', 'day') \
> .option('checkpointLocation', checkpoint_path) \
> .option('path', output_path) \
> .queryName('filesink') \
> .start()
> {quote}
> Sample event for "processingType":"STREAMING"
> {quote}
> 25/10/29 00:49:02 DEBUG wire: http-outgoing-52 >>
> "{"eventTime":"2025-10-28T13:45:34.282Z","producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent","eventType":"COMPLETE","run":{"runId":"019a2b14-4e9d-7574-95a9-55182f07591d","facets":{"parent":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-0/ParentRunFacet.json#/$defs/ParentRunFacet","run":{"runId":"019a2b10-5218-7ba3-9cf4-1ce4b1800752"},"job":{"namespace":"spark_namespace","name":"filesink"},"root":{"run":{"runId":"019a2b10-5218-7ba3-9cf4-1ce4b1800752"},"job":{"namespace":"airflow_namespace","name":"airflow_dag.airflow_task"}}},"spark_properties":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","properties":{"spark.master":"local[*]","spark.app.name":"filesink"}},"processing_engine":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet","version":"4.0.1","name":"spark","openlineageAdapterVersion":"1.39.0"},"environment-properties":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","environment-properties":{}}}},"job":{"namespace":"spark_namespace","name":"filesink.project","facets":{"jobType":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet","processingType":"STREAMING","integration":"SPARK","jobType":"SQL_JOB"}}},"inputs":[],"outputs":[]}"{quote}
> foreachbatch sink
> sample code
> {quote}
> def write_to_file(batch_df, batch_id):
> if batch_df.count() > 0:
> batch_df.write \
> .mode("append") \
> .partitionBy("year", "month", "day") \
> .parquet(output_path)
> {quote}
> {quote}
> query = streaming_df \
> .writeStream \
> .outputMode("append") \
> .foreachBatch(write_to_file) \
> .option("checkpointLocation", checkpoint_path) \
> .trigger(processingTime='10 seconds') \
> .start()
> {quote}
> The above code with generate both streaming and batch processing type event.
> Sample streaming type event
> {quote}
> 25/10/29 01:04:45 DEBUG wire: http-outgoing-1 >>
> "{"eventTime":"2025-10-28T14:04:43.373Z","producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent","eventType":"COMPLETE","run":{"runId":"019a2b22-b364-79ca-be87-2d173c25c16c","facets":{"parent":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-0/ParentRunFacet.json#/$defs/ParentRunFacet","run":{"runId":"019a2b22-9e86-770e-8af9-f9136ec6594c"},"job":{"namespace":"spark_namespace","name":"foreachbatchsink"},"root":{"run":{"runId":"019a2b22-9e86-770e-8af9-f9136ec6594c"},"job":{"namespace":"airflow_namespace","name":"airflow_dag.airflow_task"}}},"spark_properties":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","properties":{"spark.master":"local[*]","spark.app.name":"foreachbatchsink"}},"processing_engine":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet","version":"4.0.1","name":"spark","openlineageAdapterVersion":"1.39.0"},"environment-properties":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","environment-properties":{}}}},"job":{"namespace":"spark_namespace","name":"foreachbatchsink.project","facets":{"jobType":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet","processingType":"STREAMING","integration":"SPARK","jobType":"SQL_JOB"}}},"inputs":[],"outputs":[]}"
> {quote}
> Sample batch type event
> {quote}
> 25/10/29 01:07:26 DEBUG wire: http-outgoing-33 >>
> "{"eventTime":"2025-10-28T14:07:20.711Z","producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent","eventType":"COMPLETE","run":{"runId":"019a2b25-28f6-7fa0-a4e2-2aaba4f61d7e","facets":{"parent":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-0/ParentRunFacet.json#/$defs/ParentRunFacet","run":{"runId":"019a2b22-9e86-770e-8af9-f9136ec6594c"},"job":{"namespace":"spark_namespace","name":"foreachbatchsink"},"root":{"run":{"runId":"019a2b22-9e86-770e-8af9-f9136ec6594c"},"job":{"namespace":"airflow_namespace","name":"airflow_dag.airflow_task"}}},"spark_properties":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","properties":{"spark.master":"local[*]","spark.app.name":"foreachbatchsink"}},"processing_engine":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet","version":"4.0.1","name":"spark","openlineageAdapterVersion":"1.39.0"},"environment-properties":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","environment-properties":{}}}},"job":{"namespace":"spark_namespace","name":"foreachbatchsink.execute_insert_into_hadoop_fs_relation_command.tests_output_foreachbatchsink","facets":{"jobType":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet","processingType":"BATCH","integration":"SPARK","jobType":"SQL_JOB"}}},"inputs":[],"outputs":[{"namespace":"file","name":"/Users/xxxx/venvs/tests/output_foreachbatchsink","facets":{"dataSource":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet","name":"file","uri":"file"},"schema":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet","fields":[{"name":"id","type":"long"},{"name":"name","type":"string"},{"name":"timestamp","type":"timestamp"},{"name":"value","type":"double"},{"name":"year","type":"integer"},{"name":"month","type":"integer"},{"name":"day","type":"integer"}]}},"outputFacets":{"outputStatistics":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-2/OutputStatisticsOutputDatasetFacet.json#/$defs/OutputStatisticsOutputDatasetFacet","rowCount":10,"size":13259,"fileCount":10}}}]}"{quote}
> What you think should happen instead
> File sink for spark structured streaming should create open lineage event
> with valid output details as the information is in the spark query logic plan.
> Here is the logic plan for streaming query using file sink.
> {quote}
> == Analyzed Logical Plan ==
> id: bigint, name: string, timestamp: timestamp, value: double, year: int,
> month: int, day: int
> ~WriteToMicroBatchDataSourceV1
> FileSink[file:/Users/xxx/venvs/tests/output_filesink],
> 753bdc9a-07cd-4788-a17d-27ff622ababc,
> [checkpointLocation=checkpoint_filesink, path=output_filesink,
> queryName=filesink], Append, 1
> +- ~Project [id#2L, name#3, timestamp#0, value#4, year#5, month#6,
> dayofmonth(cast(timestamp#0 as date)) AS day#7]
> +- ~Project [id#2L, name#3, timestamp#0, value#4, year#5,
> month(cast(timestamp#0 as date)) AS month#6]
> +- ~Project [id#2L, name#3, timestamp#0, value#4, year(cast(timestamp#0
> as date)) AS year#5]
> +- ~Project [(value#1L % cast(1000 as bigint)) AS id#2L,
> concat(user_, cast((value#1L % cast(100 as bigint)) as string)) AS name#3,
> timestamp#0, (rand(-1344458628259366487) * cast(100 as double)) AS value#4]
> +- ~StreamingDataSourceV2ScanRelation[timestamp#0, value#1L]
> RateStream(rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=12)
> {quote}
> How to reproduce
> * step 1: install pyspark on your local machine
> https://spark.apache.org/docs/latest/api/python/getting_started/install.html
> * step 2: install openlineage server on your local machine
> https://openlineage.io/getting-started
> * step 3: refer to following spark-submit command to run file_sink.py and
> foreachbatch_sink.py. You will see the open lineage event in the debug logs.
> {quote}
> spark-submit --packages io.openlineage:openlineage-spark_2.13:1.39.0
> --conf
> "spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener"
> --conf "spark.openlineage.transport.type=http" --conf
> "spark.openlineage.transport.url=http://localhost:5000" --conf
> "spark.openlineage.namespace=spark_namespace" --conf
> "spark.openlineage.parentJobNamespace=airflow_namespace" --conf
> "spark.openlineage.parentJobName=airflow_dag.airflow_task" --conf
> "spark.openlineage.parentRunId=xxxx-xxxx-xxxx-xxxx" [filename].py
> {quote}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]