[
https://issues.apache.org/jira/browse/SPARK-46990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Wenchen Fan resolved SPARK-46990.
---------------------------------
Fix Version/s: 4.0.0
Resolution: Fixed
Issue resolved by pull request 45578
[https://github.com/apache/spark/pull/45578]
> Regression: Unable to load empty avro files emitted by event-hubs
> -----------------------------------------------------------------
>
> Key: SPARK-46990
> URL: https://issues.apache.org/jira/browse/SPARK-46990
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 3.5.0
> Environment: Databricks 14.0 - 14.3 (spark 3.5.0)
> Reporter: Kamil Kandzia
> Assignee: Ivan Sadikov
> Priority: Major
> Labels: pull-request-available
> Fix For: 4.0.0
>
> Attachments: second=02.avro
>
>
> In azure, I use databricks and event-hubs. Up until spark version 3.4.1 (in
> databricks as 13.3 LTS) empty avro files emitted by event-hubs can be read.
> Since version 3.5.0, it is impossible to load these files (even if I have
> multiple avro files to load and one of them is empty, it can't perform an
> operation like count or save). I tested this on databricks versions 14.0,
> 14.1, 14.2, 14.3 and it doesn't work properly in any of them.
> I use the following code:
>
> {code:java}
> df = spark.read.format("avro") \
> .load('abfss://<container>@<storage>.dfs.core.windows.net/<evh-namespace>/<evh>/0/2024/02/05/22/46/10.avro')
>
> df.count() <- in this operation the spark hangs{code}
> I am sending a fragment of logs from databricks and query plan:
> {code:java}
> 24/02/06 10:03:10 INFO ProgressReporter$: Added result fetcher for
> 2734305632140666820_7640723027790427455_4f56f528d4a44796a98821713778d5f9
> 24/02/06 10:03:11 INFO InMemoryFileIndex: Start listing leaf files and
> directories. Size of Paths: 1; threshold: 32
> 24/02/06 10:03:11 INFO InMemoryFileIndex: Start listing leaf files and
> directories. Size of Paths: 0; threshold: 32
> 24/02/06 10:03:11 INFO InMemoryFileIndex: It took 9 ms to list leaf files for
> 1 paths.
> 24/02/06 10:03:11 INFO ProgressReporter$: Removed result fetcher for
> 2734305632140666820_7640723027790427455_4f56f528d4a44796a98821713778d5f9
> 24/02/06 10:03:12 INFO ProgressReporter$: Added result fetcher for
> 2734305632140666820_6526693737104909881_a07acddb350f44a284cac52db0b2fb21
> 24/02/06 10:03:12 INFO ClusterLoadMonitor: Added query with execution ID:38.
> Current active queries:1
> 24/02/06 10:03:12 INFO FileSourceStrategy: Pushed Filters:
> 24/02/06 10:03:12 INFO FileSourceStrategy: Post-Scan Filters:
> 24/02/06 10:03:12 INFO CodeGenerator: Code generated in 10.636308 ms
> 24/02/06 10:03:12 INFO MemoryStore: Block broadcast_34 stored as values in
> memory (estimated size 409.5 KiB, free 3.3 GiB)
> 24/02/06 10:03:12 INFO MemoryStore: Block broadcast_34_piece0 stored as bytes
> in memory (estimated size 14.5 KiB, free 3.3 GiB)
> 24/02/06 10:03:12 INFO BlockManagerInfo: Added broadcast_34_piece0 in memory
> on <IP_ADDRESS_2>:43781 (size: 14.5 KiB, free: 3.3 GiB)
> 24/02/06 10:03:12 INFO SparkContext: Created broadcast 34 from
> $anonfun$withThreadLocalCaptured$5 at LexicalThreadLocal.scala:63
> 24/02/06 10:03:12 INFO FileSourceScanExec: Planning scan with bin packing,
> max split size: 4194304 bytes, max partition size: 4194304, open cost is
> considered as scanning 4194304 bytes.
> 24/02/06 10:03:12 INFO DAGScheduler: Registering RDD 104
> ($anonfun$withThreadLocalCaptured$5 at LexicalThreadLocal.scala:63) as input
> to shuffle 11
> 24/02/06 10:03:12 INFO DAGScheduler: Got map stage job 22
> ($anonfun$withThreadLocalCaptured$5 at LexicalThreadLocal.scala:63) with 1
> output partitions
> 24/02/06 10:03:12 INFO DAGScheduler: Final stage: ShuffleMapStage 31
> ($anonfun$withThreadLocalCaptured$5 at LexicalThreadLocal.scala:63)
> 24/02/06 10:03:12 INFO DAGScheduler: Parents of final stage: List()
> 24/02/06 10:03:12 INFO DAGScheduler: Missing parents: List()
> 24/02/06 10:03:12 INFO DAGScheduler: Submitting ShuffleMapStage 31
> (MapPartitionsRDD[104] at $anonfun$withThreadLocalCaptured$5 at
> LexicalThreadLocal.scala:63), which has no missing parents
> 24/02/06 10:03:12 INFO DAGScheduler: Submitting 1 missing tasks from
> ShuffleMapStage 31 (MapPartitionsRDD[104] at
> $anonfun$withThreadLocalCaptured$5 at LexicalThreadLocal.scala:63) (first 15
> tasks are for partitions Vector(0))
> 24/02/06 10:03:12 INFO TaskSchedulerImpl: Adding task set 31.0 with 1 tasks
> resource profile 0
> 24/02/06 10:03:12 INFO TaskSetManager: TaskSet 31.0 using PreferredLocationsV1
> 24/02/06 10:03:12 WARN FairSchedulableBuilder: A job was submitted with
> scheduler pool 2734305632140666820, which has not been configured. This can
> happen when the file that pools are read from isn't set, or when that file
> doesn't contain 2734305632140666820. Created 2734305632140666820 with default
> configuration (schedulingMode: FIFO, minShare: 0, weight: 1)
> 24/02/06 10:03:12 INFO FairSchedulableBuilder: Added task set TaskSet_31.0
> tasks to pool 2734305632140666820
> 24/02/06 10:03:12 INFO TaskSetManager: Starting task 0.0 in stage 31.0 (TID
> 449) (<IP_ADDRESS>, executor 3, partition 0, PROCESS_LOCAL,
> 24/02/06 10:03:12 INFO MemoryStore: Block broadcast_35 stored as values in
> memory (estimated size 137.2 KiB, free 3.3 GiB)
> 24/02/06 10:03:12 INFO MemoryStore: Block broadcast_35_piece0 stored as bytes
> in memory (estimated size 41.3 KiB, free 3.3 GiB)
> 24/02/06 10:03:12 INFO BlockManagerInfo: Added broadcast_35_piece0 in memory
> on <IP_ADDRESS_2>:43781 (size: 41.3 KiB, free: 3.3 GiB)
> 24/02/06 10:03:12 INFO SparkContext: Created broadcast 35 from broadcast at
> TaskSetManager.scala:723
> 24/02/06 10:03:12 INFO BlockManagerInfo: Added broadcast_35_piece0 in memory
> on <IP_ADDRESS>:40825 (size: 41.3 KiB, free: 3.6 GiB)
> 24/02/06 10:03:13 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory
> on <IP_ADDRESS>:40825 (size: 17.6 KiB, free: 3.6 GiB)
> 24/02/06 10:03:14 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 0.0, New Ema: 1.0
> 24/02/06 10:03:15 INFO BlockManagerInfo: Added broadcast_34_piece0 in memory
> on <IP_ADDRESS>:40825 (size: 14.5 KiB, free: 3.6 GiB)
> 24/02/06 10:03:17 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:03:20 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:03:23 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:03:26 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:03:29 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:03:32 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:03:35 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:03:38 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:03:41 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:03:44 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:03:47 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:03:50 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:03:53 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:03:56 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:03:58 INFO DataSourceFactory$: DataSource Jdbc URL:
> jdbc:mariadb://<DELETED FOR JIRA PURPOSES>
> 24/02/06 10:03:58 INFO HikariDataSource: metastore-monitor - Starting...
> 24/02/06 10:03:58 INFO HikariDataSource: metastore-monitor - Start completed.
> 24/02/06 10:03:58 INFO HikariDataSource: metastore-monitor - Shutdown
> initiated...
> 24/02/06 10:03:58 INFO HikariDataSource: metastore-monitor - Shutdown
> completed.
> 24/02/06 10:03:58 INFO MetastoreMonitor: Metastore healthcheck successful
> (connection duration = 302 milliseconds)
> 24/02/06 10:03:59 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:04:02 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:04:05 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:04:08 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:04:11 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:04:13 INFO HiveMetaStore: 1: get_database: default
> 24/02/06 10:04:13 INFO audit: ugi=root ip=unknown-ip-addr
> cmd=get_database: default
> 24/02/06 10:04:13 INFO DriverCorral: DBFS health check ok
> 24/02/06 10:04:13 INFO DriverCorral: Metastore health check ok
> 24/02/06 10:04:14 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:04:17 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:04:20 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:04:23 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:04:26 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:04:29 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:04:32 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:04:35 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:04:38 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:04:41 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:04:44 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:04:47 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:04:50 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:04:53 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:04:56 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:04:59 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:05:02 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:05:05 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:05:08 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:05:11 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:05:14 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:05:17 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:05:20 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:05:23 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:05:26 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:05:29 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:05:32 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:05:35 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:05:38 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> 24/02/06 10:05:41 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old
> Ema: 1.0, New Ema: 1.0
> == Parsed Logical Plan ==
> Relation
> [SequenceNumber#451L,Offset#452,EnqueuedTimeUtc#453,SystemProperties#454,Properties#455,Body#456]
> avro== Analyzed Logical Plan ==
> SequenceNumber: bigint, Offset: string, EnqueuedTimeUtc: string,
> SystemProperties:
> map<string,struct<member0:bigint,member1:double,member2:string,member3:binary>>,
> Properties:
> map<string,struct<member0:bigint,member1:double,member2:string,member3:binary>>,
> Body: binary
> Relation
> [SequenceNumber#451L,Offset#452,EnqueuedTimeUtc#453,SystemProperties#454,Properties#455,Body#456]
> avro== Optimized Logical Plan ==
> Relation
> [SequenceNumber#451L,Offset#452,EnqueuedTimeUtc#453,SystemProperties#454,Properties#455,Body#456]
> avro== Physical Plan ==
> FileScan avro
> [SequenceNumber#451L,Offset#452,EnqueuedTimeUtc#453,SystemProperties#454,Properties#455,Body#456]
> Batched: false, DataFilters: [], Format: Avro, Location: InMemoryFileIndex(1
> paths)[abfss://<container>@<storage>.dfs.core..., PartitionFilters: [],
> PushedFilters: [], ReadSchema:
> struct<SequenceNumber:bigint,Offset:string,EnqueuedTimeUtc:string,SystemProperties:map<string,str...
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]