[
https://issues.apache.org/jira/browse/SPARK-38062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Wenchen Fan resolved SPARK-38062.
---------------------------------
Fix Version/s: 3.3.0
Resolution: Fixed
Issue resolved by pull request 35358
[https://github.com/apache/spark/pull/35358]
> FallbackStorage shouldn't attempt to resolve arbitrary "remote" hostname
> ------------------------------------------------------------------------
>
> Key: SPARK-38062
> URL: https://issues.apache.org/jira/browse/SPARK-38062
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core
> Affects Versions: 3.3.0
> Reporter: Erik Krogen
> Assignee: Erik Krogen
> Priority: Major
> Fix For: 3.3.0
>
>
> {{FallbackStorage}} uses a placeholder block manager ID:
> {code:scala}
> private[spark] object FallbackStorage extends Logging {
> /** We use one block manager id as a place holder. */
> val FALLBACK_BLOCK_MANAGER_ID: BlockManagerId = BlockManagerId("fallback",
> "remote", 7337)
> {code}
> That second argument is normally interpreted as a hostname, but is passed as
> the string "remote" in this case.
> {{BlockManager}} will consider this placeholder as one of the peers in some
> cases:
> {code:language=scala|title=BlockManager.scala}
> private[storage] def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
> peerFetchLock.synchronized {
> ...
> if (cachedPeers.isEmpty &&
>
> conf.get(config.STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
> Seq(FallbackStorage.FALLBACK_BLOCK_MANAGER_ID)
> } else {
> cachedPeers
> }
> }
> }
> {code}
> {{BlockManagerDecommissioner.ShuffleMigrationRunnable}} will then attempt to
> perform an upload to this placeholder ID:
> {code:scala}
> try {
> blocks.foreach { case (blockId, buffer) =>
> logDebug(s"Migrating sub-block ${blockId}")
> bm.blockTransferService.uploadBlockSync(
> peer.host,
> peer.port,
> peer.executorId,
> blockId,
> buffer,
> StorageLevel.DISK_ONLY,
> null) // class tag, we don't need for shuffle
> logDebug(s"Migrated sub-block $blockId")
> }
> logInfo(s"Migrated $shuffleBlockInfo to $peer")
> } catch {
> case e: IOException =>
> ...
> if
> (bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo).size <
> blocks.size) {
> logWarning(s"Skipping block $shuffleBlockInfo, block
> deleted.")
> } else if (fallbackStorage.isDefined) {
> fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))
> } else {
> logError(s"Error occurred during migrating
> $shuffleBlockInfo", e)
> keepRunning = false
> }
> {code}
> Since "remote" is not expected to be a resolvable hostname, an
> {{IOException}} occurs, and {{fallbackStorage}} is used. But, we shouldn't
> try to resolve this. First off, it's completely unnecessary and strange to be
> treating the placeholder ID as a resolvable hostname, relying on an exception
> to realize that we need to use the {{fallbackStorage}}.
> To make matters worse, in some network environments, "remote" may be a
> resolvable hostname, completely breaking this functionality. In the
> particular environment that I use for running automated tests, there is a DNS
> entry for "remote" which, when you attempt to connect to it, will hang for a
> long period of time. This essentially hangs the executor decommission
> process, and in the case of unit tests, breaks {{FallbackStorageSuite}} as it
> exceeds its timeouts. I'm not sure, but it's possible this is related to
> SPARK-35584 as well (if sometimes in the GA environment, it takes a long time
> for the OS to decide that "remote" is not a valid hostname).
> We shouldn't attempt to treat this placeholder ID as a real hostname.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]