[ 
https://issues.apache.org/jira/browse/SPARK-49515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18022584#comment-18022584
 ] 

Eunjin Song commented on SPARK-49515:
-------------------------------------

This was from our customer (Azure Synapse) and there were 2 other customers 
complaining about the metric. I provided the config as mitigation.

spark.conf.set("spark.sql.optimizer.excludedRules", 
"org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelation")

 

It works in Spark 3.5.1. 

 
2025-09-24 23:46:00,588 INFO MicroBatchExecution [stream execution thread for 
[id = 5c27f91c-f294-48a9-b0c5-5b2be4c5a8f9, runId = 
d35e7466-3622-4490-9a56-cb8e90db70aa]]: Streaming query made progress: {
  "id" : "5c27f91c-f294-48a9-b0c5-5b2be4c5a8f9",
  "runId" : "d35e7466-3622-4490-9a56-cb8e90db70aa",
  "name" : null,
  "timestamp" : "2025-09-24T23:45:52.648Z",
  "batchId" : 0,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 1.267427122940431,
  "durationMs" : {
    "addBatch" : 4888,
    "commitOffsets" : 196,
    "getBatch" : 1488,
    "latestOffset" : 724,
    "queryPlanning" : 229,
    "triggerExecution" : 7889,
    "walCommit" : 258
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : 
"FileStreamSource[abfss://28d877b2-236f-4776-96b1-00558dc33...@msit-onelake.dfs.fabric.microsoft.com/0a62e703-78e4-491f-872b-8f8e6d9adc03/Files/non-empty]",
    "startOffset" : null,
    "endOffset" : {
      "logOffset" : 0
    },
    "latestOffset" : null,
    "numInputRows" : 10,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 1.267427122940431
  }, {
    "description" : 
"FileStreamSource[abfss://28d877b2-236f-4776-96b1-00558dc33...@msit-onelake.dfs.fabric.microsoft.com/0a62e703-78e4-491f-872b-8f8e6d9adc03/Files/empty]",
    "startOffset" : null,
    "endOffset" : {
      "logOffset" : 0
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "FileSink[Files/output]",
    "numOutputRows" : -1
  }
}
 

 

> numInputRows is 0 when one of input relation is empty
> -----------------------------------------------------
>
>                 Key: SPARK-49515
>                 URL: https://issues.apache.org/jira/browse/SPARK-49515
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.4.0
>         Environment: Reproducible in Spark 3.4 , probably all versions after 
> introducing org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelation
>  
>            Reporter: Eunjin Song
>            Priority: Trivial
>         Attachments: Query 0.png, Query 1.png, Query 2.png
>
>
> [https://github.com/apache/spark/blob/2ed6c3e511f322c5fd01953736c376a85ff2c687/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala#L481]
>  
> In case  # relation of logical plan != # relation of physical plan, 
> numInputRows captured as 0 for all sources. However it includes the case that 
> empty relations in union is removed from physical plan by 
> "org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelation".
>  
> Repro code:
> import org.apache.spark.sql.streaming.Trigger
> val df1 = spark.readStream.schema(sch).parquet("/streaming/parquet1")
> val df2 = spark.readStream.schema(sch).parquet("/streaming/emptyparquet")
> df1.union(df2).writeStream.option("checkpointLocation", 
> "/streaming/checkpoint8").format("parquet").trigger(Trigger.Once()).option("path",
>  "/streaming/result9").start
>  
> workaround:
> spark.conf.set("spark.sql.optimizer.excludedRules","org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelation")



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to