Kamil Kandzia created SPARK-46990:
-------------------------------------

             Summary: Regression: Unable to load empty avro files emitted by 
event-hubs
                 Key: SPARK-46990
                 URL: https://issues.apache.org/jira/browse/SPARK-46990
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 3.5.0
         Environment: Databricks 14.0 - 14.3 (spark 3.5.0)
            Reporter: Kamil Kandzia


In azure, I use databricks and event-hubs. Up until spark version 3.4.1 (in 
databricks as 13.3 LTS) empty avro files emitted by event-hubs can be read. 
Since version 3.5.0, it is impossible to load these files (even if I have 
multiple avro files to load and one of them is empty, it can't perform an 
operation like count or save). I tested this on databricks versions 14.0, 14.1, 
14.2, 14.3 and it doesn't work properly in any of them.

I use the following code:
 
{code:java}
df = spark.read.format("avro") \                 
.load('abfss://<container>@<storage>.dfs.core.windows.net/<evh-namespace>/<evh>/0/2024/02/05/22/46/10.avro')
    

df.count() <- in this operation the spark hangs{code}
I am sending a fragment of logs from databricks and query plan:
{code:java}
24/02/06 10:03:10 INFO ProgressReporter$: Added result fetcher for 
2734305632140666820_7640723027790427455_4f56f528d4a44796a98821713778d5f9
24/02/06 10:03:11 INFO InMemoryFileIndex: Start listing leaf files and 
directories. Size of Paths: 1; threshold: 32
24/02/06 10:03:11 INFO InMemoryFileIndex: Start listing leaf files and 
directories. Size of Paths: 0; threshold: 32
24/02/06 10:03:11 INFO InMemoryFileIndex: It took 9 ms to list leaf files for 1 
paths.
24/02/06 10:03:11 INFO ProgressReporter$: Removed result fetcher for 
2734305632140666820_7640723027790427455_4f56f528d4a44796a98821713778d5f9
24/02/06 10:03:12 INFO ProgressReporter$: Added result fetcher for 
2734305632140666820_6526693737104909881_a07acddb350f44a284cac52db0b2fb21
24/02/06 10:03:12 INFO ClusterLoadMonitor: Added query with execution ID:38. 
Current active queries:1
24/02/06 10:03:12 INFO FileSourceStrategy: Pushed Filters: 
24/02/06 10:03:12 INFO FileSourceStrategy: Post-Scan Filters: 
24/02/06 10:03:12 INFO CodeGenerator: Code generated in 10.636308 ms
24/02/06 10:03:12 INFO MemoryStore: Block broadcast_34 stored as values in 
memory (estimated size 409.5 KiB, free 3.3 GiB)
24/02/06 10:03:12 INFO MemoryStore: Block broadcast_34_piece0 stored as bytes 
in memory (estimated size 14.5 KiB, free 3.3 GiB)
24/02/06 10:03:12 INFO BlockManagerInfo: Added broadcast_34_piece0 in memory on 
<IP_ADDRESS_2>:43781 (size: 14.5 KiB, free: 3.3 GiB)
24/02/06 10:03:12 INFO SparkContext: Created broadcast 34 from 
$anonfun$withThreadLocalCaptured$5 at LexicalThreadLocal.scala:63
24/02/06 10:03:12 INFO FileSourceScanExec: Planning scan with bin packing, max 
split size: 4194304 bytes, max partition size: 4194304, open cost is considered 
as scanning 4194304 bytes.
24/02/06 10:03:12 INFO DAGScheduler: Registering RDD 104 
($anonfun$withThreadLocalCaptured$5 at LexicalThreadLocal.scala:63) as input to 
shuffle 11
24/02/06 10:03:12 INFO DAGScheduler: Got map stage job 22 
($anonfun$withThreadLocalCaptured$5 at LexicalThreadLocal.scala:63) with 1 
output partitions
24/02/06 10:03:12 INFO DAGScheduler: Final stage: ShuffleMapStage 31 
($anonfun$withThreadLocalCaptured$5 at LexicalThreadLocal.scala:63)
24/02/06 10:03:12 INFO DAGScheduler: Parents of final stage: List()
24/02/06 10:03:12 INFO DAGScheduler: Missing parents: List()
24/02/06 10:03:12 INFO DAGScheduler: Submitting ShuffleMapStage 31 
(MapPartitionsRDD[104] at $anonfun$withThreadLocalCaptured$5 at 
LexicalThreadLocal.scala:63), which has no missing parents
24/02/06 10:03:12 INFO DAGScheduler: Submitting 1 missing tasks from 
ShuffleMapStage 31 (MapPartitionsRDD[104] at $anonfun$withThreadLocalCaptured$5 
at LexicalThreadLocal.scala:63) (first 15 tasks are for partitions Vector(0))
24/02/06 10:03:12 INFO TaskSchedulerImpl: Adding task set 31.0 with 1 tasks 
resource profile 0
24/02/06 10:03:12 INFO TaskSetManager: TaskSet 31.0 using PreferredLocationsV1
24/02/06 10:03:12 WARN FairSchedulableBuilder: A job was submitted with 
scheduler pool 2734305632140666820, which has not been configured. This can 
happen when the file that pools are read from isn't set, or when that file 
doesn't contain 2734305632140666820. Created 2734305632140666820 with default 
configuration (schedulingMode: FIFO, minShare: 0, weight: 1)
24/02/06 10:03:12 INFO FairSchedulableBuilder: Added task set TaskSet_31.0 
tasks to pool 2734305632140666820
24/02/06 10:03:12 INFO TaskSetManager: Starting task 0.0 in stage 31.0 (TID 
449) (<IP_ADDRESS>, executor 3, partition 0, PROCESS_LOCAL, 
24/02/06 10:03:12 INFO MemoryStore: Block broadcast_35 stored as values in 
memory (estimated size 137.2 KiB, free 3.3 GiB)
24/02/06 10:03:12 INFO MemoryStore: Block broadcast_35_piece0 stored as bytes 
in memory (estimated size 41.3 KiB, free 3.3 GiB)
24/02/06 10:03:12 INFO BlockManagerInfo: Added broadcast_35_piece0 in memory on 
<IP_ADDRESS_2>:43781 (size: 41.3 KiB, free: 3.3 GiB)
24/02/06 10:03:12 INFO SparkContext: Created broadcast 35 from broadcast at 
TaskSetManager.scala:723
24/02/06 10:03:12 INFO BlockManagerInfo: Added broadcast_35_piece0 in memory on 
<IP_ADDRESS>:40825 (size: 41.3 KiB, free: 3.6 GiB)
24/02/06 10:03:13 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 
<IP_ADDRESS>:40825 (size: 17.6 KiB, free: 3.6 GiB)
24/02/06 10:03:14 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
0.0, New Ema: 1.0 
24/02/06 10:03:15 INFO BlockManagerInfo: Added broadcast_34_piece0 in memory on 
<IP_ADDRESS>:40825 (size: 14.5 KiB, free: 3.6 GiB)
24/02/06 10:03:17 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:03:20 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:03:23 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:03:26 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:03:29 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:03:32 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:03:35 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:03:38 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:03:41 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:03:44 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:03:47 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:03:50 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:03:53 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:03:56 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:03:58 INFO DataSourceFactory$: DataSource Jdbc URL: 
jdbc:mariadb://<DELETED FOR JIRA PURPOSES>
24/02/06 10:03:58 INFO HikariDataSource: metastore-monitor - Starting...
24/02/06 10:03:58 INFO HikariDataSource: metastore-monitor - Start completed.
24/02/06 10:03:58 INFO HikariDataSource: metastore-monitor - Shutdown 
initiated...
24/02/06 10:03:58 INFO HikariDataSource: metastore-monitor - Shutdown completed.
24/02/06 10:03:58 INFO MetastoreMonitor: Metastore healthcheck successful 
(connection duration = 302 milliseconds)
24/02/06 10:03:59 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:04:02 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:04:05 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:04:08 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:04:11 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:04:13 INFO HiveMetaStore: 1: get_database: default
24/02/06 10:04:13 INFO audit: ugi=root    ip=unknown-ip-addr    
cmd=get_database: default    
24/02/06 10:04:13 INFO DriverCorral: DBFS health check ok
24/02/06 10:04:13 INFO DriverCorral: Metastore health check ok
24/02/06 10:04:14 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:04:17 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:04:20 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:04:23 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:04:26 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:04:29 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:04:32 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:04:35 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:04:38 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:04:41 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:04:44 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:04:47 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:04:50 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:04:53 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:04:56 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:04:59 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:05:02 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:05:05 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:05:08 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:05:11 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:05:14 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:05:17 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:05:20 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:05:23 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:05:26 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:05:29 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:05:32 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:05:35 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:05:38 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 
24/02/06 10:05:41 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 
1.0, New Ema: 1.0 

== Parsed Logical Plan ==
Relation 
[SequenceNumber#451L,Offset#452,EnqueuedTimeUtc#453,SystemProperties#454,Properties#455,Body#456]
 avro== Analyzed Logical Plan ==
SequenceNumber: bigint, Offset: string, EnqueuedTimeUtc: string, 
SystemProperties: 
map<string,struct<member0:bigint,member1:double,member2:string,member3:binary>>,
 Properties: 
map<string,struct<member0:bigint,member1:double,member2:string,member3:binary>>,
 Body: binary
Relation 
[SequenceNumber#451L,Offset#452,EnqueuedTimeUtc#453,SystemProperties#454,Properties#455,Body#456]
 avro== Optimized Logical Plan ==
Relation 
[SequenceNumber#451L,Offset#452,EnqueuedTimeUtc#453,SystemProperties#454,Properties#455,Body#456]
 avro== Physical Plan ==
FileScan avro 
[SequenceNumber#451L,Offset#452,EnqueuedTimeUtc#453,SystemProperties#454,Properties#455,Body#456]
 Batched: false, DataFilters: [], Format: Avro, Location: InMemoryFileIndex(1 
paths)[abfss://<container>@<storage>.dfs.core..., PartitionFilters: [], 
PushedFilters: [], ReadSchema: 
struct<SequenceNumber:bigint,Offset:string,EnqueuedTimeUtc:string,SystemProperties:map<string,str...
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to