This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 98cb0cd [SPARK-33635][SS] Adjust the order of check in
KafkaTokenUtil.needTokenUpdate to remedy perf regression
98cb0cd is described below
commit 98cb0cda4101111130a6c9bc244632acd02826a3
Author: Jungtaek Lim (HeartSaVioR) <[email protected]>
AuthorDate: Tue Jan 5 21:59:49 2021 -0800
[SPARK-33635][SS] Adjust the order of check in
KafkaTokenUtil.needTokenUpdate to remedy perf regression
### What changes were proposed in this pull request?
This PR proposes to adjust the order of check in
KafkaTokenUtil.needTokenUpdate, so that short-circuit applies on the
non-delegation token cases (insecure + secured without delegation token) and
remedies the performance regression heavily.
### Why are the changes needed?
There's a serious performance regression between Spark 2.4 vs Spark 3.0 on
read path against Kafka data source.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manually ran a reproducer
(https://github.com/codegorillauk/spark-kafka-read with modification to just
count instead of writing to Kafka topic) with measuring the time.
> the branch applying the change with adding measurement
https://github.com/HeartSaVioR/spark/commits/debug-SPARK-33635-v3.0.1
> the branch only adding measurement
https://github.com/HeartSaVioR/spark/commits/debug-original-ver-SPARK-33635-v3.0.1
> the result (before the fix)
count: 10280000
Took 41.634007047 secs
21/01/06 13:16:07 INFO KafkaDataConsumer: debug ver. 17-original
21/01/06 13:16:07 INFO KafkaDataConsumer: Total time taken to retrieve:
82118 ms
> the result (after the fix)
count: 10280000
Took 7.964058475 secs
21/01/06 13:08:22 INFO KafkaDataConsumer: debug ver. 17
21/01/06 13:08:22 INFO KafkaDataConsumer: Total time taken to retrieve: 987
ms
Closes #31056 from HeartSaVioR/SPARK-33635.
Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit fa9309001a47a2b87f7a735f964537886ed9bd4c)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git
a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
index 49109d3..370e727 100644
---
a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
+++
b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
@@ -295,8 +295,8 @@ private[spark] object KafkaTokenUtil extends Logging {
sparkConf: SparkConf,
params: ju.Map[String, Object],
clusterConfig: Option[KafkaTokenClusterConf]): Boolean = {
- if (HadoopDelegationTokenManager.isServiceEnabled(sparkConf, "kafka") &&
- clusterConfig.isDefined &&
params.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) {
+ if (clusterConfig.isDefined &&
params.containsKey(SaslConfigs.SASL_JAAS_CONFIG) &&
+ HadoopDelegationTokenManager.isServiceEnabled(sparkConf, "kafka")) {
logDebug("Delegation token used by connector, checking if uses the
latest token.")
val connectorJaasParams =
params.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[String]
getTokenJaasParams(clusterConfig.get) != connectorJaasParams
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]