This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e7b8be37226 [SPARK-39266][CORE] Cleanup unused `spark.rpc.numRetries`
and `spark.rpc.retry.wait` configs
e7b8be37226 is described below
commit e7b8be37226142cd7fcfac82576c46f8f88f74b3
Author: Josh Rosen <[email protected]>
AuthorDate: Mon May 23 21:58:19 2022 -0700
[SPARK-39266][CORE] Cleanup unused `spark.rpc.numRetries` and
`spark.rpc.retry.wait` configs
### What changes were proposed in this pull request?
This PR cleans up the `spark.rpc.numRetries` and `spark.rpc.retry.wait`
configs, both of which are unused.
### Why are the changes needed?
Since SPARK-19450 / #16790 in Spark 2.2.0, both of these configurations are
unused and setting them has no effect. Marking the configs as deprecated and
cleaning them up from the docs helps to avoid user confusion.
In addition, this cleanup slightly improves the performance of constructing
`RpcEndpointRef`s because it removes two unused fields that were initialized by
reading the deprecated configs.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
n/a
Closes #36645 from JoshRosen/cleanup-unused-rpc-configs.
Authored-by: Josh Rosen <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
core/src/main/scala/org/apache/spark/SparkConf.scala | 6 ++----
.../org/apache/spark/internal/config/Network.scala | 12 ------------
.../scala/org/apache/spark/rpc/RpcEndpointRef.scala | 2 --
.../src/main/scala/org/apache/spark/util/RpcUtils.scala | 10 ----------
.../test/scala/org/apache/spark/SparkConfSuite.scala | 8 --------
.../test/scala/org/apache/spark/rpc/RpcEnvSuite.scala | 4 ----
docs/configuration.md | 17 -----------------
.../sql/execution/streaming/state/StateStoreSuite.scala | 6 +-----
8 files changed, 3 insertions(+), 62 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index cf121749b73..00a0f61ab47 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -606,6 +606,8 @@ private[spark] object SparkConf extends Logging {
"Please use the new excludedOnFailure options,
spark.excludeOnFailure.*"),
DeprecatedConfig("spark.yarn.am.port", "2.0.0", "Not used anymore"),
DeprecatedConfig("spark.executor.port", "2.0.0", "Not used anymore"),
+ DeprecatedConfig("spark.rpc.numRetries", "2.2.0", "Not used anymore"),
+ DeprecatedConfig("spark.rpc.retry.wait", "2.2.0", "Not used anymore"),
DeprecatedConfig("spark.shuffle.service.index.cache.entries", "2.3.0",
"Not used anymore. Please use spark.shuffle.service.index.cache.size"),
DeprecatedConfig("spark.yarn.credentials.file.retention.count", "2.4.0",
"Not used anymore."),
@@ -682,10 +684,6 @@ private[spark] object SparkConf extends Logging {
AlternateConfig("spark.io.compression.snappy.block.size", "1.4")),
IO_COMPRESSION_LZ4_BLOCKSIZE.key -> Seq(
AlternateConfig("spark.io.compression.lz4.block.size", "1.4")),
- RPC_NUM_RETRIES.key -> Seq(
- AlternateConfig("spark.akka.num.retries", "1.4")),
- RPC_RETRY_WAIT.key -> Seq(
- AlternateConfig("spark.akka.retry.wait", "1.4")),
RPC_ASK_TIMEOUT.key -> Seq(
AlternateConfig("spark.akka.askTimeout", "1.4")),
RPC_LOOKUP_TIMEOUT.key -> Seq(
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Network.scala
b/core/src/main/scala/org/apache/spark/internal/config/Network.scala
index 0961d062cc0..56839420839 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Network.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Network.scala
@@ -92,16 +92,4 @@ private[spark] object Network {
.version("1.6.0")
.intConf
.createOptional
-
- private[spark] val RPC_NUM_RETRIES =
- ConfigBuilder("spark.rpc.numRetries")
- .version("1.4.0")
- .intConf
- .createWithDefault(3)
-
- private[spark] val RPC_RETRY_WAIT =
- ConfigBuilder("spark.rpc.retry.wait")
- .version("1.4.0")
- .timeConf(TimeUnit.MILLISECONDS)
- .createWithDefaultString("3s")
}
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
index a3d27b0d099..925dcdba073 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
@@ -30,8 +30,6 @@ import org.apache.spark.util.RpcUtils
private[spark] abstract class RpcEndpointRef(conf: SparkConf)
extends Serializable with Logging {
- private[this] val maxRetries = RpcUtils.numRetries(conf)
- private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)
/**
diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
index 0e4debc5953..30f5fced5a8 100644
--- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
@@ -36,16 +36,6 @@ private[spark] object RpcUtils {
rpcEnv.setupEndpointRef(RpcAddress(driverHost, driverPort), name)
}
- /** Returns the configured number of times to retry connecting */
- def numRetries(conf: SparkConf): Int = {
- conf.get(RPC_NUM_RETRIES)
- }
-
- /** Returns the configured number of milliseconds to wait on each retry */
- def retryWaitMs(conf: SparkConf): Long = {
- conf.get(RPC_RETRY_WAIT)
- }
-
/** Returns the default Spark timeout to use for RPC ask operations. */
def askRpcTimeout(conf: SparkConf): RpcTimeout = {
RpcTimeout(conf, Seq(RPC_ASK_TIMEOUT.key, NETWORK_TIMEOUT.key), "120s")
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 7779fb2aeaf..7ae1eac1db1 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -284,17 +284,9 @@ class SparkConfSuite extends SparkFunSuite with
LocalSparkContext with ResetSyst
test("akka deprecated configs") {
val conf = new SparkConf()
- assert(!conf.contains(RPC_NUM_RETRIES))
- assert(!conf.contains(RPC_RETRY_WAIT))
assert(!conf.contains(RPC_ASK_TIMEOUT))
assert(!conf.contains(RPC_LOOKUP_TIMEOUT))
- conf.set("spark.akka.num.retries", "1")
- assert(RpcUtils.numRetries(conf) === 1)
-
- conf.set("spark.akka.retry.wait", "2")
- assert(RpcUtils.retryWaitMs(conf) === 2L)
-
conf.set("spark.akka.askTimeout", "3")
assert(RpcUtils.askRpcTimeout(conf).duration === 3.seconds)
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index acd6049a0c6..1681bf9b57b 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -171,8 +171,6 @@ abstract class RpcEnvSuite extends SparkFunSuite with
BeforeAndAfterAll {
val conf = new SparkConf()
val shortProp = "spark.rpc.short.timeout"
- conf.set(Network.RPC_RETRY_WAIT, 0L)
- conf.set(Network.RPC_NUM_RETRIES, 1)
val anotherEnv = createRpcEnv(conf, "remote", 0, clientMode = true)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address,
"ask-timeout")
@@ -203,8 +201,6 @@ abstract class RpcEnvSuite extends SparkFunSuite with
BeforeAndAfterAll {
val conf = new SparkConf()
val shortProp = "spark.rpc.short.timeout"
- conf.set(Network.RPC_RETRY_WAIT, 0L)
- conf.set(Network.RPC_NUM_RETRIES, 1)
val anotherEnv = createRpcEnv(conf, "remote", 0, clientMode = true)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "ask-abort")
diff --git a/docs/configuration.md b/docs/configuration.md
index 3374be02606..e01558a309a 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -2090,23 +2090,6 @@ Apart from these, the following properties are also
available, and may be useful
</td>
<td>1.1.1</td>
</tr>
-<tr>
- <td><code>spark.rpc.numRetries</code></td>
- <td>3</td>
- <td>
- Number of times to retry before an RPC task gives up.
- An RPC task will run at most times of this number.
- </td>
- <td>1.4.0</td>
-</tr>
-<tr>
- <td><code>spark.rpc.retry.wait</code></td>
- <td>3s</td>
- <td>
- Duration for an RPC ask operation to wait before retrying.
- </td>
- <td>1.4.0</td>
-</tr>
<tr>
<td><code>spark.rpc.askTimeout</code></td>
<td><code>spark.network.timeout</code></td>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index dde925bb2d9..f74a7a0aabb 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -35,7 +35,6 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark._
import org.apache.spark.LocalSparkContext._
-import org.apache.spark.internal.config.Network.RPC_NUM_RETRIES
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow,
UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.util.quietly
@@ -250,9 +249,6 @@ class StateStoreSuite extends
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
val conf = new SparkConf()
.setMaster("local")
.setAppName("test")
- // Make sure that when SparkContext stops, the StateStore maintenance
thread 'quickly'
- // fails to talk to the StateStoreCoordinator and unloads all the
StateStores
- .set(RPC_NUM_RETRIES, 1)
val opId = 0
val dir1 = newDir()
val storeProviderId1 = StateStoreProviderId(StateStoreId(dir1, opId, 0),
UUID.randomUUID)
@@ -1178,7 +1174,7 @@ abstract class StateStoreSuiteBase[ProviderClass <:
StateStoreProvider]
assert(metricNew.desc === "new desc", "incorrect description in copied
instance")
assert(metricNew.name === "m1", "incorrect name in copied instance")
- val conf = new
SparkConf().setMaster("local").setAppName("SPARK-35763").set(RPC_NUM_RETRIES, 1)
+ val conf = new SparkConf().setMaster("local").setAppName("SPARK-35763")
withSpark(new SparkContext(conf)) { sc =>
val sqlMetric = metric.createSQLMetric(sc)
assert(sqlMetric != null)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]