This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 242581f [SPARK-33440][CORE] Use current timestamp with warning log in
HadoopFSDelegationTokenProvider when the issue date for token is not set up
properly
242581f is described below
commit 242581f4926c994bfc5af388cae31645112b2798
Author: Jungtaek Lim (HeartSaVioR) <[email protected]>
AuthorDate: Tue Dec 1 06:44:15 2020 +0900
[SPARK-33440][CORE] Use current timestamp with warning log in
HadoopFSDelegationTokenProvider when the issue date for token is not set up
properly
### What changes were proposed in this pull request?
This PR proposes to use current timestamp with warning log when the issue
date for token is not set up properly. The next section will explain the
rationalization with details.
### Why are the changes needed?
Unfortunately not every implementations respect the `issue date` in
`AbstractDelegationTokenIdentifier`, which Spark relies on while calculating.
The default value of issue date is 0L, which is far from actual issue date,
breaking logic on calculating next renewal date under some circumstance,
leading to 0 interval (immediate) on rescheduling token renewal.
In HadoopFSDelegationTokenProvider, Spark calculates token renewal interval
as below:
https://github.com/apache/spark/blob/2c64b731ae6a976b0d75a95901db849b4a0e2393/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala#L123-L134
The interval is calculated as `token.renew() - identifier.getIssueDate`,
which is providing correct interval assuming both `token.renew()` and
`identifier.getIssueDate` produce correct value, but it's going to be weird
when `identifier.getIssueDate` provides 0L (default value), like below:
```
20/10/13 06:34:19 INFO security.HadoopFSDelegationTokenProvider: Renewal
interval is 1603175657000 for token S3ADelegationToken/IDBroker
20/10/13 06:34:19 INFO security.HadoopFSDelegationTokenProvider: Renewal
interval is 86400048 for token HDFS_DELEGATION_TOKEN
```
Hopefully we pick the minimum value as safety guard (so in this case,
`86400048` is being picked up), but the safety guard leads unintentional bad
impact on this case.
https://github.com/apache/spark/blob/2c64b731ae6a976b0d75a95901db849b4a0e2393/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala#L58-L71
Spark leverages the interval being calculated in above, "minimum" value of
intervals, and blindly adds the value to token's issue date to calculates the
next renewal date for the token, and picks "minimum" value again. In
problematic case, the value would be `86400048` (86400048 + 0) which is quite
smaller than current timestamp.
https://github.com/apache/spark/blob/2c64b731ae6a976b0d75a95901db849b4a0e2393/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala#L228-L234
The next renewal date is subtracted with current timestamp again to get the
interval, and multiplexed by configured ratio to produce the final schedule
interval. In problematic case, this value goes to negative.
https://github.com/apache/spark/blob/2c64b731ae6a976b0d75a95901db849b4a0e2393/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala#L180-L188
There's a safety guard to not allow negative value, but that's simply 0
meaning schedule immediately. This triggers next calculation of next renewal
date to calculate the schedule interval, lead to the same behavior, hence
updating delegation token immediately and continuously.
As we fetch token just before the calculation happens, the actual issue
date is likely slightly before, hence it's not that dangerous to use current
timestamp as issue date for the token the issue date has not been set up
properly. Still, it's better not to leave the token implementation as it is, so
we log warn message to let end users consult with token implementer.
### Does this PR introduce _any_ user-facing change?
Yes. End users won't encounter the tight loop of schedule of token renewal
after the PR. In end users' perspective of reflection, there's nothing end
users need to change.
### How was this patch tested?
Manually tested with problematic environment.
Closes #30366 from HeartSaVioR/SPARK-33440.
Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
(cherry picked from commit f5d2165c95fe83f24be9841807613950c1d5d6d0)
Signed-off-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
---
.../security/HadoopDelegationTokenManager.scala | 4 +++-
.../security/HadoopFSDelegationTokenProvider.scala | 27 +++++++++++++++++++---
2 files changed, 27 insertions(+), 4 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
index 3168c76..6ce195b 100644
---
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
@@ -178,7 +178,7 @@ private[spark] class HadoopDelegationTokenManager(
private def scheduleRenewal(delay: Long): Unit = {
val _delay = math.max(0, delay)
- logInfo(s"Scheduling renewal in ${UIUtils.formatDuration(delay)}.")
+ logInfo(s"Scheduling renewal in ${UIUtils.formatDuration(_delay)}.")
val renewalTask = new Runnable() {
override def run(): Unit = {
@@ -230,6 +230,8 @@ private[spark] class HadoopDelegationTokenManager(
val now = System.currentTimeMillis
val ratio = sparkConf.get(CREDENTIALS_RENEWAL_INTERVAL_RATIO)
val delay = (ratio * (nextRenewal - now)).toLong
+ logInfo(s"Calculated delay on renewal is $delay, based on next renewal
$nextRenewal " +
+ s"and the ratio $ratio, and current time $now")
scheduleRenewal(delay)
creds
}
diff --git
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
index 4e91e72..cd9516b 100644
---
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
@@ -63,7 +63,8 @@ private[deploy] class HadoopFSDelegationTokenProvider
val identifier = token
.decodeIdentifier()
.asInstanceOf[AbstractDelegationTokenIdentifier]
- identifier.getIssueDate + interval
+ val tokenKind = token.getKind.toString
+ getIssueDate(tokenKind, identifier) + interval
}
if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min)
}
@@ -126,13 +127,33 @@ private[deploy] class HadoopFSDelegationTokenProvider
Try {
val newExpiration = token.renew(hadoopConf)
val identifier =
token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
- val interval = newExpiration - identifier.getIssueDate
- logInfo(s"Renewal interval is $interval for token
${token.getKind.toString}")
+ val tokenKind = token.getKind.toString
+ val interval = newExpiration - getIssueDate(tokenKind, identifier)
+ logInfo(s"Renewal interval is $interval for token $tokenKind")
interval
}.toOption
}
if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
}
+
+ private def getIssueDate(kind: String, identifier:
AbstractDelegationTokenIdentifier): Long = {
+ val now = System.currentTimeMillis()
+ val issueDate = identifier.getIssueDate
+ if (issueDate > now) {
+ logWarning(s"Token $kind has set up issue date later than current time.
(provided: " +
+ s"$issueDate / current timestamp: $now) Please make sure clocks are in
sync between " +
+ "machines. If the issue is not a clock mismatch, consult token
implementor to check " +
+ "whether issue date is valid.")
+ issueDate
+ } else if (issueDate > 0L) {
+ issueDate
+ } else {
+ logWarning(s"Token $kind has not set up issue date properly. (provided:
$issueDate) " +
+ s"Using current timestamp ($now) as issue date instead. Consult token
implementor to fix " +
+ "the behavior.")
+ now
+ }
+ }
}
private[deploy] object HadoopFSDelegationTokenProvider {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]