[
https://issues.apache.org/jira/browse/SPARK-54071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18037385#comment-18037385
]
Maxim Martynov commented on SPARK-54071:
----------------------------------------
Why this is an Spark issue, and not OpenLineage one?
> Spark Structured Streaming Filesink can not generate open lineage with output
> details
> -------------------------------------------------------------------------------------
>
> Key: SPARK-54071
> URL: https://issues.apache.org/jira/browse/SPARK-54071
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 4.0.1
> Environment: ### OpenLineage version
> `io.openlineage:openlineage-spark_2.13:1.39.0`
> ### Technology and package versions
> Python: 3.13.3
> Scala: 2.13.16
> Java: OpenJDK 64-Bit Server VM, 17.0.16
> `pip freeze`
> py4j==0.10.9.9
> pyspark==4.0.1
>
> ### spark-submit command
> ```
> spark-submit --packages io.openlineage:openlineage-spark_2.13:1.39.0
> --conf
> "spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener"
> --conf "spark.openlineage.transport.type=http" --conf
> "spark.openlineage.transport.url=http://localhost:5000" --conf
> "spark.openlineage.namespace=spark_namespace" --conf
> "spark.openlineage.parentJobNamespace=airflow_namespace" --conf
> "spark.openlineage.parentJobName=airflow_dag.airflow_task" --conf
> "spark.openlineage.parentRunId=xxxx-xxxx-xxxx-xxxx" [filename].py
> ```
> Reporter: Yang Guo
> Priority: Major
> Attachments: file_sink.py, foreachbatch_sink.py
>
>
> h2. Environment details
> h3. OpenLineage version
> {quote}io.openlineage:openlineage-spark_2.13:1.39.0
> Technology and package versions
> Python: 3.13.3
> Scala: 2.13.16
> Java: OpenJDK 64-Bit Server VM, 17.0.16
> pip freeze
> py4j==0.10.9.9
> pyspark==4.0.1{quote}
> For the openlineage set up, I used the default setting:
> {quote}$ git clone https://github.com/MarquezProject/marquez.git && cd marquez
> $ ./docker/up.sh{quote}
> h3. Spark Deployment details
> I used native spark on local machine. There is no managed services involved.
> Problem details
> h2. Issue details
> When using Spark structured streaming to write parquet file to file systems,
> * File sink will only generate openlineage event with streaming
> processing type with output information as empty.
> * Foreachbatch sink will generate openlineage event with both streaming
> processing type and batch processing type. The batch processing type will
> have valid output information.
> The bug is that File sink in Spark structured streaming does not generate
> open lineage event with output details.
> More details about the sample code and sample events are following.
> File sink:
> Sample code:
> {quote}
> query = streaming_df.writeStream \
> .format('parquet') \
> .outputMode('append') \
> .partitionBy('year', 'month', 'day') \
> .option('checkpointLocation', checkpoint_path) \
> .option('path', output_path) \
> .queryName('filesink') \
> .start()
> {quote}
> Sample event for "processingType":"STREAMING"
> {quote}
> 25/10/29 00:49:02 DEBUG wire: http-outgoing-52 >>
> "{"eventTime":"2025-10-28T13:45:34.282Z","producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent","eventType":"COMPLETE","run":{"runId":"019a2b14-4e9d-7574-95a9-55182f07591d","facets":{"parent":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-0/ParentRunFacet.json#/$defs/ParentRunFacet","run":{"runId":"019a2b10-5218-7ba3-9cf4-1ce4b1800752"},"job":{"namespace":"spark_namespace","name":"filesink"},"root":{"run":{"runId":"019a2b10-5218-7ba3-9cf4-1ce4b1800752"},"job":{"namespace":"airflow_namespace","name":"airflow_dag.airflow_task"}}},"spark_properties":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","properties":{"spark.master":"local[*]","spark.app.name":"filesink"}},"processing_engine":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet","version":"4.0.1","name":"spark","openlineageAdapterVersion":"1.39.0"},"environment-properties":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","environment-properties":{}}}},"job":{"namespace":"spark_namespace","name":"filesink.project","facets":{"jobType":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet","processingType":"STREAMING","integration":"SPARK","jobType":"SQL_JOB"}}},"inputs":[],"outputs":[]}"{quote}
> foreachbatch sink
> sample code
> {quote}
> def write_to_file(batch_df, batch_id):
> if batch_df.count() > 0:
> batch_df.write \
> .mode("append") \
> .partitionBy("year", "month", "day") \
> .parquet(output_path)
> {quote}
> {quote}
> query = streaming_df \
> .writeStream \
> .outputMode("append") \
> .foreachBatch(write_to_file) \
> .option("checkpointLocation", checkpoint_path) \
> .trigger(processingTime='10 seconds') \
> .start()
> {quote}
> The above code with generate both streaming and batch processing type event.
> Sample streaming type event
> {quote}
> 25/10/29 01:04:45 DEBUG wire: http-outgoing-1 >>
> "{"eventTime":"2025-10-28T14:04:43.373Z","producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent","eventType":"COMPLETE","run":{"runId":"019a2b22-b364-79ca-be87-2d173c25c16c","facets":{"parent":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-0/ParentRunFacet.json#/$defs/ParentRunFacet","run":{"runId":"019a2b22-9e86-770e-8af9-f9136ec6594c"},"job":{"namespace":"spark_namespace","name":"foreachbatchsink"},"root":{"run":{"runId":"019a2b22-9e86-770e-8af9-f9136ec6594c"},"job":{"namespace":"airflow_namespace","name":"airflow_dag.airflow_task"}}},"spark_properties":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","properties":{"spark.master":"local[*]","spark.app.name":"foreachbatchsink"}},"processing_engine":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet","version":"4.0.1","name":"spark","openlineageAdapterVersion":"1.39.0"},"environment-properties":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","environment-properties":{}}}},"job":{"namespace":"spark_namespace","name":"foreachbatchsink.project","facets":{"jobType":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet","processingType":"STREAMING","integration":"SPARK","jobType":"SQL_JOB"}}},"inputs":[],"outputs":[]}"
> {quote}
> Sample batch type event
> {quote}
> 25/10/29 01:07:26 DEBUG wire: http-outgoing-33 >>
> "{"eventTime":"2025-10-28T14:07:20.711Z","producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent","eventType":"COMPLETE","run":{"runId":"019a2b25-28f6-7fa0-a4e2-2aaba4f61d7e","facets":{"parent":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-0/ParentRunFacet.json#/$defs/ParentRunFacet","run":{"runId":"019a2b22-9e86-770e-8af9-f9136ec6594c"},"job":{"namespace":"spark_namespace","name":"foreachbatchsink"},"root":{"run":{"runId":"019a2b22-9e86-770e-8af9-f9136ec6594c"},"job":{"namespace":"airflow_namespace","name":"airflow_dag.airflow_task"}}},"spark_properties":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","properties":{"spark.master":"local[*]","spark.app.name":"foreachbatchsink"}},"processing_engine":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet","version":"4.0.1","name":"spark","openlineageAdapterVersion":"1.39.0"},"environment-properties":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet","environment-properties":{}}}},"job":{"namespace":"spark_namespace","name":"foreachbatchsink.execute_insert_into_hadoop_fs_relation_command.tests_output_foreachbatchsink","facets":{"jobType":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet","processingType":"BATCH","integration":"SPARK","jobType":"SQL_JOB"}}},"inputs":[],"outputs":[{"namespace":"file","name":"/Users/xxxx/venvs/tests/output_foreachbatchsink","facets":{"dataSource":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet","name":"file","uri":"file"},"schema":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet","fields":[{"name":"id","type":"long"},{"name":"name","type":"string"},{"name":"timestamp","type":"timestamp"},{"name":"value","type":"double"},{"name":"year","type":"integer"},{"name":"month","type":"integer"},{"name":"day","type":"integer"}]}},"outputFacets":{"outputStatistics":{"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.39.0/integration/spark","_schemaURL":"https://openlineage.io/spec/facets/1-0-2/OutputStatisticsOutputDatasetFacet.json#/$defs/OutputStatisticsOutputDatasetFacet","rowCount":10,"size":13259,"fileCount":10}}}]}"{quote}
> What you think should happen instead
> File sink for spark structured streaming should create open lineage event
> with valid output details as the information is in the spark query logic plan.
> Here is the logic plan for streaming query using file sink.
> {quote}
> == Analyzed Logical Plan ==
> id: bigint, name: string, timestamp: timestamp, value: double, year: int,
> month: int, day: int
> ~WriteToMicroBatchDataSourceV1
> FileSink[file:/Users/xxx/venvs/tests/output_filesink],
> 753bdc9a-07cd-4788-a17d-27ff622ababc,
> [checkpointLocation=checkpoint_filesink, path=output_filesink,
> queryName=filesink], Append, 1
> +- ~Project [id#2L, name#3, timestamp#0, value#4, year#5, month#6,
> dayofmonth(cast(timestamp#0 as date)) AS day#7]
> +- ~Project [id#2L, name#3, timestamp#0, value#4, year#5,
> month(cast(timestamp#0 as date)) AS month#6]
> +- ~Project [id#2L, name#3, timestamp#0, value#4, year(cast(timestamp#0
> as date)) AS year#5]
> +- ~Project [(value#1L % cast(1000 as bigint)) AS id#2L,
> concat(user_, cast((value#1L % cast(100 as bigint)) as string)) AS name#3,
> timestamp#0, (rand(-1344458628259366487) * cast(100 as double)) AS value#4]
> +- ~StreamingDataSourceV2ScanRelation[timestamp#0, value#1L]
> RateStream(rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=12)
> {quote}
> How to reproduce
> * step 1: install pyspark on your local machine
> https://spark.apache.org/docs/latest/api/python/getting_started/install.html
> * step 2: install openlineage server on your local machine
> https://openlineage.io/getting-started
> * step 3: refer to following spark-submit command to run file_sink.py and
> foreachbatch_sink.py. You will see the open lineage event in the debug logs.
> {quote}
> spark-submit --packages io.openlineage:openlineage-spark_2.13:1.39.0
> --conf
> "spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener"
> --conf "spark.openlineage.transport.type=http" --conf
> "spark.openlineage.transport.url=http://localhost:5000" --conf
> "spark.openlineage.namespace=spark_namespace" --conf
> "spark.openlineage.parentJobNamespace=airflow_namespace" --conf
> "spark.openlineage.parentJobName=airflow_dag.airflow_task" --conf
> "spark.openlineage.parentRunId=xxxx-xxxx-xxxx-xxxx" [filename].py
> {quote}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]